diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go index 4774c8820..31e9a4c73 100644 --- a/federationapi/federationapi_keys_test.go +++ b/federationapi/federationapi_keys_test.go @@ -102,7 +102,7 @@ func TestMain(m *testing.M) { ) // Finally, build the server key APIs. - sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics) + sbase := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics) s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true) } diff --git a/setup/base/base.go b/setup/base/base.go index 0e7528a03..5cbd7da9c 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -86,6 +86,7 @@ type BaseDendrite struct { DNSCache *gomatrixserverlib.DNSCache Database *sql.DB DatabaseWriter sqlutil.Writer + EnableMetrics bool } const NoListener = "" @@ -96,7 +97,7 @@ const HTTPClientTimeout = time.Second * 30 type BaseDendriteOptions int const ( - NoCacheMetrics BaseDendriteOptions = iota + DisableMetrics BaseDendriteOptions = iota UseHTTPAPIs PolylithMode ) @@ -107,12 +108,12 @@ const ( func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...BaseDendriteOptions) *BaseDendrite { platformSanityChecks() useHTTPAPIs := false - cacheMetrics := true + enableMetrics := true isMonolith := true for _, opt := range options { switch opt { - case NoCacheMetrics: - cacheMetrics = false + case DisableMetrics: + enableMetrics = false case UseHTTPAPIs: useHTTPAPIs = true case PolylithMode: @@ -160,7 +161,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base } } - cache, err := caching.NewInMemoryLRUCache(cacheMetrics) + cache, err := caching.NewInMemoryLRUCache(enableMetrics) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") } @@ -246,6 +247,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base apiHttpClient: &apiClient, Database: db, // set if monolith with global connection pool only DatabaseWriter: writer, // set if monolith with global connection pool only + EnableMetrics: enableMetrics, } } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 426f02bb6..248b0e656 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -13,6 +13,7 @@ import ( "github.com/sirupsen/logrus" natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" natsclient "github.com/nats-io/nats.go" ) @@ -21,6 +22,13 @@ type NATSInstance struct { sync.Mutex } +func DeleteAllStreams(js nats.JetStreamContext, cfg *config.JetStream) { + for _, stream := range streams { // streams are defined in streams.go + name := cfg.Prefixed(stream.Name) + _ = js.DeleteStream(name) + } +} + func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 99d1e40c3..8ab130911 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -65,11 +65,13 @@ func NewRequestPool( userAPI userapi.SyncUserAPI, keyAPI keyapi.SyncKeyAPI, rsAPI roomserverAPI.SyncRoomserverAPI, streams *streams.Streams, notifier *notifier.Notifier, - producer PresencePublisher, + producer PresencePublisher, enableMetrics bool, ) *RequestPool { - prometheus.MustRegister( - activeSyncRequests, waitingSyncRequests, - ) + if enableMetrics { + prometheus.MustRegister( + activeSyncRequests, waitingSyncRequests, + ) + } rp := &RequestPool{ db: db, cfg: cfg, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index dbc6e240c..d8bacb2da 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -65,7 +65,7 @@ func AddPublicRoutes( JetStream: js, } - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer) + requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer, base.EnableMetrics) userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{ JetStream: js, diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go new file mode 100644 index 000000000..12b5178d8 --- /dev/null +++ b/syncapi/syncapi_test.go @@ -0,0 +1,162 @@ +package syncapi + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" + rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/test" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/nats-io/nats.go" +) + +type syncRoomserverAPI struct { + rsapi.SyncRoomserverAPI + rooms []*test.Room +} + +func (s *syncRoomserverAPI) QueryLatestEventsAndState(ctx context.Context, req *rsapi.QueryLatestEventsAndStateRequest, res *rsapi.QueryLatestEventsAndStateResponse) error { + var room *test.Room + for _, r := range s.rooms { + if r.ID == req.RoomID { + room = r + break + } + } + if room == nil { + res.RoomExists = false + return nil + } + res.RoomVersion = room.Version + return nil // TODO: return state +} + +type syncUserAPI struct { + userapi.SyncUserAPI + accounts []userapi.Device +} + +func (s *syncUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error { + for _, acc := range s.accounts { + if acc.AccessToken == req.AccessToken { + res.Device = &acc + return nil + } + } + res.Err = "unknown user" + return nil +} + +func (s *syncUserAPI) PerformLastSeenUpdate(ctx context.Context, req *userapi.PerformLastSeenUpdateRequest, res *userapi.PerformLastSeenUpdateResponse) error { + return nil +} + +type syncKeyAPI struct { + keyapi.KeyInternalAPI +} + +func TestSyncAPI(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + testSync(t, dbType) + }) +} + +func testSync(t *testing.T, dbType test.DBType) { + user := test.NewUser() + room := test.NewRoom(t, user) + alice := userapi.Device{ + ID: "ALICEID", + UserID: user.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "Alice", + AccountType: userapi.AccountTypeUser, + } + + base, close := test.CreateBaseDendrite(t, dbType) + defer close() + + jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) + var msgs []*nats.Msg + for _, ev := range room.Events() { + var addsStateIDs []string + if ev.StateKey() != nil { + addsStateIDs = append(addsStateIDs, ev.EventID()) + } + msgs = append(msgs, test.NewOutputEventMsg(t, base, room.ID, api.OutputEvent{ + Type: rsapi.OutputTypeNewRoomEvent, + NewRoomEvent: &rsapi.OutputNewRoomEvent{ + Event: ev, + AddsStateEventIDs: addsStateIDs, + }, + })) + } + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, &syncKeyAPI{}) + test.MustPublishMsgs(t, jsctx, msgs...) + + testCases := []struct { + name string + req *http.Request + wantCode int + wantJoinedRooms []string + }{ + { + name: "missing access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "timeout": "0", + })), + wantCode: 401, + }, + { + name: "unknown access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": "foo", + "timeout": "0", + })), + wantCode: 401, + }, + { + name: "valid access token", + req: test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": alice.AccessToken, + "timeout": "0", + })), + wantCode: 200, + wantJoinedRooms: []string{room.ID}, + }, + } + // TODO: find a better way + time.Sleep(500 * time.Millisecond) + + for _, tc := range testCases { + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, tc.req) + if w.Code != tc.wantCode { + t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) + } + if tc.wantJoinedRooms != nil { + var res types.Response + if err := json.NewDecoder(w.Body).Decode(&res); err != nil { + t.Fatalf("%s: failed to decode response body: %s", tc.name, err) + } + if len(res.Rooms.Join) != len(tc.wantJoinedRooms) { + t.Errorf("%s: got %v joined rooms, want %v.\nResponse: %+v", tc.name, len(res.Rooms.Join), len(tc.wantJoinedRooms), res) + } + t.Logf("res: %+v", res.Rooms.Join[room.ID]) + + gotEventIDs := make([]string, len(res.Rooms.Join[room.ID].Timeline.Events)) + for i, ev := range res.Rooms.Join[room.ID].Timeline.Events { + gotEventIDs[i] = ev.EventID + } + test.AssertEventIDsEqual(t, gotEventIDs, room.Events()) + } + } +} diff --git a/test/base.go b/test/base.go index 32fc8dc53..664442c03 100644 --- a/test/base.go +++ b/test/base.go @@ -1,11 +1,83 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package test import ( + "errors" + "fmt" + "io/fs" + "os" + "strings" + "testing" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/nats-io/nats.go" ) +func CreateBaseDendrite(t *testing.T, dbType DBType) (*base.BaseDendrite, func()) { + var cfg config.Dendrite + cfg.Defaults(false) + cfg.Global.JetStream.InMemory = true + + switch dbType { + case DBTypePostgres: + cfg.Global.Defaults(true) // autogen a signing key + cfg.MediaAPI.Defaults(true) // autogen a media path + // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use + // the file system event with InMemory=true :( + cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType) + connStr, close := PrepareDBConnectionString(t, dbType) + cfg.Global.DatabaseOptions = config.DatabaseOptions{ + ConnectionString: config.DataSource(connStr), + MaxOpenConnections: 10, + MaxIdleConnections: 2, + ConnMaxLifetimeSeconds: 60, + } + return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), close + case DBTypeSQLite: + cfg.Defaults(true) // sets a sqlite db per component + // use a distinct prefix else concurrent postgres/sqlite runs will clash since NATS will use + // the file system event with InMemory=true :( + cfg.Global.JetStream.TopicPrefix = fmt.Sprintf("Test_%d_", dbType) + return base.NewBaseDendrite(&cfg, "Test", base.DisableMetrics), func() { + // cleanup db files. This risks getting out of sync as we add more database strings :( + dbFiles := []config.DataSource{ + cfg.AppServiceAPI.Database.ConnectionString, + cfg.FederationAPI.Database.ConnectionString, + cfg.KeyServer.Database.ConnectionString, + cfg.MSCs.Database.ConnectionString, + cfg.MediaAPI.Database.ConnectionString, + cfg.RoomServer.Database.ConnectionString, + cfg.SyncAPI.Database.ConnectionString, + cfg.UserAPI.AccountDatabase.ConnectionString, + } + for _, fileURI := range dbFiles { + path := strings.TrimPrefix(string(fileURI), "file:") + err := os.Remove(path) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + t.Fatalf("failed to cleanup sqlite db '%s': %s", fileURI, err) + } + } + } + default: + t.Fatalf("unknown db type: %v", dbType) + } + return nil, nil +} + func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nats.Conn) { if cfg == nil { cfg = &config.Dendrite{} diff --git a/test/http.go b/test/http.go new file mode 100644 index 000000000..a458a3385 --- /dev/null +++ b/test/http.go @@ -0,0 +1,45 @@ +package test + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + "testing" +) + +type HTTPRequestOpt func(req *http.Request) + +func WithJSONBody(t *testing.T, body interface{}) HTTPRequestOpt { + t.Helper() + b, err := json.Marshal(body) + if err != nil { + t.Fatalf("WithJSONBody: %s", err) + } + return func(req *http.Request) { + req.Body = io.NopCloser(bytes.NewBuffer(b)) + } +} + +func WithQueryParams(qps map[string]string) HTTPRequestOpt { + var vals url.Values = map[string][]string{} + for k, v := range qps { + vals.Set(k, v) + } + return func(req *http.Request) { + req.URL.RawQuery = vals.Encode() + } +} + +func NewRequest(t *testing.T, method, path string, opts ...HTTPRequestOpt) *http.Request { + t.Helper() + req, err := http.NewRequest(method, "http://localhost"+path, nil) + if err != nil { + t.Fatalf("failed to make new HTTP request %v %v : %v", method, path, err) + } + for _, o := range opts { + o(req) + } + return req +} diff --git a/test/jetstream.go b/test/jetstream.go new file mode 100644 index 000000000..488c22beb --- /dev/null +++ b/test/jetstream.go @@ -0,0 +1,35 @@ +package test + +import ( + "encoding/json" + "testing" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/nats-io/nats.go" +) + +func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Msg) { + t.Helper() + for _, msg := range msgs { + if _, err := jsctx.PublishMsg(msg); err != nil { + t.Fatalf("MustPublishMsgs: failed to publish message: %s", err) + } + } +} + +func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg { + t.Helper() + msg := &nats.Msg{ + Subject: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), + Header: nats.Header{}, + } + msg.Header.Set(jetstream.RoomID, roomID) + var err error + msg.Data, err = json.Marshal(update) + if err != nil { + t.Fatalf("failed to marshal update: %s", err) + } + return msg +}