From 85ac8a3f5ba407ece584843a4d77466c1c4f5565 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 9 Jun 2020 12:07:33 +0100 Subject: [PATCH] Factor out how monolith routes get added (#1107) Previously we had 3 monoliths: - dendrite-monolith-server - dendrite-demo-libp2p - dendritejs which all had their own of setting up public routes. Factor this out into a new `setup.Monolith` struct which gets all dependencies set as fields. This is different to `basecomponent.Base` which doesn't provide any way to set configured deps (e.g public rooms db) Part of a larger process to clean up how we initialise Dendrite. --- clientapi/clientapi.go | 23 +++--- cmd/dendrite-client-api-server/main.go | 2 +- cmd/dendrite-demo-libp2p/main.go | 38 +++++----- cmd/dendrite-monolith-server/main.go | 42 +++++------ cmd/dendrite-public-rooms-api-server/main.go | 2 +- cmd/dendrite-sync-api-server/main.go | 2 +- cmd/dendritejs/main.go | 37 ++++++---- internal/setup/monolith.go | 78 ++++++++++++++++++++ publicroomsapi/publicroomsapi.go | 8 +- syncapi/syncapi.go | 14 ++-- 10 files changed, 170 insertions(+), 76 deletions(-) create mode 100644 internal/setup/monolith.go diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index d00be6eb0..987815c23 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -15,6 +15,7 @@ package clientapi import ( + "github.com/Shopify/sarama" "github.com/gorilla/mux" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" @@ -24,7 +25,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/routing" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" - "github.com/matrix-org/dendrite/internal/basecomponent" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/transactions" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -34,7 +35,9 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. func AddPublicRoutes( router *mux.Router, - base *basecomponent.BaseDendrite, + cfg *config.Dendrite, + consumer sarama.Consumer, + producer sarama.SyncProducer, deviceDB devices.Database, accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, @@ -49,24 +52,24 @@ func AddPublicRoutes( eduProducer := producers.NewEDUServerProducer(eduInputAPI) userUpdateProducer := &producers.UserUpdateProducer{ - Producer: base.KafkaProducer, - Topic: string(base.Cfg.Kafka.Topics.UserUpdates), + Producer: producer, + Topic: string(cfg.Kafka.Topics.UserUpdates), } syncProducer := &producers.SyncAPIProducer{ - Producer: base.KafkaProducer, - Topic: string(base.Cfg.Kafka.Topics.OutputClientData), + Producer: producer, + Topic: string(cfg.Kafka.Topics.OutputClientData), } - consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, accountsDB, rsAPI, + roomEventConsumer := consumers.NewOutputRoomEventConsumer( + cfg, consumer, accountsDB, rsAPI, ) - if err := consumer.Start(); err != nil { + if err := roomEventConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") } routing.Setup( - router, base.Cfg, roomserverProducer, rsAPI, asAPI, + router, cfg, roomserverProducer, rsAPI, asAPI, accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, syncProducer, eduProducer, transactionsCache, fsAPI, ) diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 58396205c..9cf57ef68 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -39,7 +39,7 @@ func main() { eduInputAPI := base.EDUServerClient() clientapi.AddPublicRoutes( - base.PublicAPIMux, base, deviceDB, accountDB, federation, keyRing, + base.PublicAPIMux, base.Cfg, base.KafkaConsumer, base.KafkaProducer, deviceDB, accountDB, federation, keyRing, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, ) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 80e2e800f..377e017ef 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -29,20 +29,15 @@ import ( p2phttp "github.com/libp2p/go-libp2p-http" p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage" "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/transactions" - "github.com/matrix-org/dendrite/mediaapi" - "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/serverkeyapi" - "github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/eduserver/cache" @@ -151,26 +146,35 @@ func main() { &base.Base, cache.New(), deviceDB, ) asAPI := appservice.NewInternalAPI(&base.Base, accountDB, deviceDB, rsAPI) - appservice.AddPublicRoutes(base.Base.PublicAPIMux, &cfg, rsAPI, accountDB, federation, transactions.New()) fsAPI := federationsender.NewInternalAPI( &base.Base, federation, rsAPI, keyRing, ) rsAPI.SetFederationSenderAPI(fsAPI) - - clientapi.AddPublicRoutes( - base.Base.PublicAPIMux, &base.Base, deviceDB, accountDB, - federation, keyRing, rsAPI, - eduInputAPI, asAPI, transactions.New(), fsAPI, - ) eduProducer := producers.NewEDUServerProducer(eduInputAPI) - federationapi.AddPublicRoutes(base.Base.PublicAPIMux, base.Base.Cfg, accountDB, deviceDB, federation, keyRing, rsAPI, asAPI, fsAPI, eduProducer) - mediaapi.AddPublicRoutes(base.Base.PublicAPIMux, base.Base.Cfg, deviceDB) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") } - publicroomsapi.AddPublicRoutes(base.Base.PublicAPIMux, &base.Base, deviceDB, publicRoomsDB, rsAPI, federation, nil) // Check this later - syncapi.AddPublicRoutes(base.Base.PublicAPIMux, &base.Base, deviceDB, accountDB, rsAPI, federation, &cfg) + + monolith := setup.Monolith{ + Config: base.Base.Cfg, + AccountDB: accountDB, + DeviceDB: deviceDB, + FedClient: federation, + KeyRing: keyRing, + KafkaConsumer: base.Base.KafkaConsumer, + KafkaProducer: base.Base.KafkaProducer, + + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + EDUProducer: eduProducer, + FederationSenderAPI: fsAPI, + RoomserverAPI: rsAPI, + ServerKeyAPI: serverKeyAPI, + + PublicRoomsDB: publicRoomsDB, + } + monolith.AddAllPublicRoutes(base.Base.PublicAPIMux) internal.SetupHTTPAPI( http.DefaultServeMux, diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 97fa3a2a6..dda3ade5a 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -19,23 +19,17 @@ import ( "net/http" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/transactions" - "github.com/matrix-org/dendrite/keyserver" - "github.com/matrix-org/dendrite/mediaapi" - "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/serverkeyapi" - "github.com/matrix-org/dendrite/syncapi" "github.com/sirupsen/logrus" ) @@ -97,7 +91,6 @@ func main() { } asAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI) - appservice.AddPublicRoutes(base.PublicAPIMux, cfg, rsAPI, accountDB, federation, transactions.New()) if base.UseHTTPAPIs { appservice.AddInternalRoutes(base.InternalAPIMux, asAPI) asAPI = base.AppserviceHTTPClient() @@ -112,24 +105,31 @@ func main() { } rsComponent.SetFederationSenderAPI(fsAPI) - clientapi.AddPublicRoutes( - base.PublicAPIMux, base, deviceDB, accountDB, - federation, keyRing, rsAPI, - eduInputAPI, asAPI, transactions.New(), fsAPI, - ) - - keyserver.AddPublicRoutes( - base.PublicAPIMux, base.Cfg, deviceDB, accountDB, - ) eduProducer := producers.NewEDUServerProducer(eduInputAPI) - federationapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing, rsAPI, asAPI, fsAPI, eduProducer) - mediaapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, deviceDB) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") } - publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, publicRoomsDB, rsAPI, federation, nil) - syncapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, accountDB, rsAPI, federation, cfg) + + monolith := setup.Monolith{ + Config: base.Cfg, + AccountDB: accountDB, + DeviceDB: deviceDB, + FedClient: federation, + KeyRing: keyRing, + KafkaConsumer: base.KafkaConsumer, + KafkaProducer: base.KafkaProducer, + + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + EDUProducer: eduProducer, + FederationSenderAPI: fsAPI, + RoomserverAPI: rsAPI, + ServerKeyAPI: serverKeyAPI, + + PublicRoomsDB: publicRoomsDB, + } + monolith.AddAllPublicRoutes(base.PublicAPIMux) internal.SetupHTTPAPI( http.DefaultServeMux, diff --git a/cmd/dendrite-public-rooms-api-server/main.go b/cmd/dendrite-public-rooms-api-server/main.go index 413d7ecbb..ff7d7958f 100644 --- a/cmd/dendrite-public-rooms-api-server/main.go +++ b/cmd/dendrite-public-rooms-api-server/main.go @@ -34,7 +34,7 @@ func main() { if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") } - publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, publicRoomsDB, rsAPI, nil, nil) + publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, base.KafkaConsumer, deviceDB, publicRoomsDB, rsAPI, nil, nil) base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI)) diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go index 4ad68c5e8..a5302f74e 100644 --- a/cmd/dendrite-sync-api-server/main.go +++ b/cmd/dendrite-sync-api-server/main.go @@ -30,7 +30,7 @@ func main() { rsAPI := base.RoomserverHTTPClient() - syncapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, accountDB, rsAPI, federation, cfg) + syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, deviceDB, accountDB, rsAPI, federation, cfg) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 9c6c7c031..9901c6e5f 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -23,21 +23,16 @@ import ( "syscall/js" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/transactions" - "github.com/matrix-org/dendrite/mediaapi" - "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver" - "github.com/matrix-org/dendrite/syncapi" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" "github.com/matrix-org/gomatrixserverlib" @@ -215,20 +210,32 @@ func main() { rsAPI.SetFederationSenderAPI(fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) - clientapi.AddPublicRoutes( - base.PublicAPIMux, base, deviceDB, accountDB, - federation, &keyRing, rsAPI, - eduInputAPI, asQuery, transactions.New(), fedSenderAPI, - ) eduProducer := producers.NewEDUServerProducer(eduInputAPI) - federationapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, &keyRing, rsAPI, asQuery, fedSenderAPI, eduProducer) - mediaapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, deviceDB) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") } - publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, publicRoomsDB, rsAPI, federation, p2pPublicRoomProvider) - syncapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, accountDB, rsAPI, federation, cfg) + + monolith := setup.Monolith{ + Config: base.Cfg, + AccountDB: accountDB, + DeviceDB: deviceDB, + FedClient: federation, + KeyRing: &keyRing, + KafkaConsumer: base.KafkaConsumer, + KafkaProducer: base.KafkaProducer, + + AppserviceAPI: asQuery, + EDUInternalAPI: eduInputAPI, + EDUProducer: eduProducer, + FederationSenderAPI: fedSenderAPI, + RoomserverAPI: rsAPI, + //ServerKeyAPI: serverKeyAPI, + + PublicRoomsDB: publicRoomsDB, + ExtPublicRoomsProvider: p2pPublicRoomProvider, + } + monolith.AddAllPublicRoutes(base.PublicAPIMux) internal.SetupHTTPAPI( http.DefaultServeMux, diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go new file mode 100644 index 000000000..d9bb28395 --- /dev/null +++ b/internal/setup/monolith.go @@ -0,0 +1,78 @@ +package setup + +import ( + "github.com/Shopify/sarama" + "github.com/gorilla/mux" + appserviceAPI "github.com/matrix-org/dendrite/appservice/api" + "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/clientapi/producers" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/federationapi" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/internal/transactions" + "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/mediaapi" + "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/dendrite/publicroomsapi/types" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api" + "github.com/matrix-org/dendrite/syncapi" + "github.com/matrix-org/gomatrixserverlib" +) + +// Monolith represents an instantiation of all dependencies required to build +// all components of Dendrite, for use in monolith mode. +type Monolith struct { + Config *config.Dendrite + DeviceDB devices.Database + AccountDB accounts.Database + KeyRing *gomatrixserverlib.KeyRing + FedClient *gomatrixserverlib.FederationClient + KafkaConsumer sarama.Consumer + KafkaProducer sarama.SyncProducer + + AppserviceAPI appserviceAPI.AppServiceQueryAPI + EDUInternalAPI eduServerAPI.EDUServerInputAPI + FederationSenderAPI federationSenderAPI.FederationSenderInternalAPI + RoomserverAPI roomserverAPI.RoomserverInternalAPI + ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI + + // TODO: remove, this isn't even a producer + EDUProducer *producers.EDUServerProducer + // TODO: can we remove this? It's weird that we are required the database + // yet every other component can do that on its own. libp2p-demo uses a custom + // database though annoyingly. + PublicRoomsDB storage.Database + + // Optional + ExtPublicRoomsProvider types.ExternalPublicRoomsProvider +} + +// AddAllPublicRoutes attaches all public paths to the given router +func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { + clientapi.AddPublicRoutes( + publicMux, m.Config, m.KafkaConsumer, m.KafkaProducer, m.DeviceDB, m.AccountDB, + m.FedClient, m.KeyRing, m.RoomserverAPI, + m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), + m.FederationSenderAPI, + ) + + keyserver.AddPublicRoutes(publicMux, m.Config, m.DeviceDB, m.AccountDB) + federationapi.AddPublicRoutes( + publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient, + m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI, + m.EDUProducer, + ) + mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB) + publicroomsapi.AddPublicRoutes( + publicMux, m.Config, m.KafkaConsumer, m.DeviceDB, m.PublicRoomsDB, m.RoomserverAPI, m.FedClient, + m.ExtPublicRoomsProvider, + ) + syncapi.AddPublicRoutes( + publicMux, m.KafkaConsumer, m.DeviceDB, m.AccountDB, m.RoomserverAPI, m.FedClient, m.Config, + ) +} diff --git a/publicroomsapi/publicroomsapi.go b/publicroomsapi/publicroomsapi.go index 280ab9e3d..1f98a4e05 100644 --- a/publicroomsapi/publicroomsapi.go +++ b/publicroomsapi/publicroomsapi.go @@ -15,9 +15,10 @@ package publicroomsapi import ( + "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" - "github.com/matrix-org/dendrite/internal/basecomponent" + "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/publicroomsapi/consumers" "github.com/matrix-org/dendrite/publicroomsapi/routing" "github.com/matrix-org/dendrite/publicroomsapi/storage" @@ -31,7 +32,8 @@ import ( // component. func AddPublicRoutes( router *mux.Router, - base *basecomponent.BaseDendrite, + cfg *config.Dendrite, + consumer sarama.Consumer, deviceDB devices.Database, publicRoomsDB storage.Database, rsAPI roomserverAPI.RoomserverInternalAPI, @@ -39,7 +41,7 @@ func AddPublicRoutes( extRoomsProvider types.ExternalPublicRoomsProvider, ) { rsConsumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, publicRoomsDB, rsAPI, + cfg, consumer, publicRoomsDB, rsAPI, ) if err := rsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start public rooms server consumer") diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 6e84dcb6f..40e652af4 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -17,11 +17,11 @@ package syncapi import ( "context" + "github.com/Shopify/sarama" "github.com/gorilla/mux" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -37,14 +37,14 @@ import ( // component. func AddPublicRoutes( router *mux.Router, - base *basecomponent.BaseDendrite, + consumer sarama.Consumer, deviceDB devices.Database, accountsDB accounts.Database, rsAPI api.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, ) { - syncDB, err := storage.NewSyncServerDatasource(string(base.Cfg.Database.SyncAPI), base.Cfg.DbProperties()) + syncDB, err := storage.NewSyncServerDatasource(string(cfg.Database.SyncAPI), cfg.DbProperties()) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } @@ -63,28 +63,28 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB) roomConsumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, notifier, syncDB, rsAPI, + cfg, consumer, notifier, syncDB, rsAPI, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") } clientConsumer := consumers.NewOutputClientDataConsumer( - base.Cfg, base.KafkaConsumer, notifier, syncDB, + cfg, consumer, notifier, syncDB, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - base.Cfg, base.KafkaConsumer, notifier, syncDB, + cfg, consumer, notifier, syncDB, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - base.Cfg, base.KafkaConsumer, notifier, syncDB, + cfg, consumer, notifier, syncDB, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer")