mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-14 23:03:50 +01:00
Rename kafka consumers to *Consumer (#294)
The prior naming was confusing, OutputRoomEvent consumed OutputNewRoomEvents
This commit is contained in:
parent
cf5ea25322
commit
ba0d0672ea
10 changed files with 59 additions and 59 deletions
|
@ -28,28 +28,28 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputRoomEvent consumes events that originated in the room server.
|
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||||
type OutputRoomEvent struct {
|
type OutputRoomEventConsumer struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *accounts.Database
|
db *accounts.Database
|
||||||
query api.RoomserverQueryAPI
|
query api.RoomserverQueryAPI
|
||||||
serverName string
|
serverName string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(
|
func NewOutputRoomEventConsumer(
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
store *accounts.Database,
|
store *accounts.Database,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
) *OutputRoomEvent {
|
) *OutputRoomEventConsumer {
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &OutputRoomEvent{
|
s := &OutputRoomEventConsumer{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
query: queryAPI,
|
query: queryAPI,
|
||||||
|
@ -61,14 +61,14 @@ func NewOutputRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEvent) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
return s.roomServerConsumer.Start()
|
return s.roomServerConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
// sync stream position may race and be incorrectly calculated.
|
// sync stream position may race and be incorrectly calculated.
|
||||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
@ -104,7 +104,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookupStateEvents looks up the state events that are added by a new event.
|
// lookupStateEvents looks up the state events that are added by a new event.
|
||||||
func (s *OutputRoomEvent) lookupStateEvents(
|
func (s *OutputRoomEventConsumer) lookupStateEvents(
|
||||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
// Fast path if there aren't any new state events.
|
// Fast path if there aren't any new state events.
|
||||||
|
|
|
@ -114,7 +114,7 @@ func main() {
|
||||||
}).Panic("Failed to setup kafka consumers")
|
}).Panic("Failed to setup kafka consumers")
|
||||||
}
|
}
|
||||||
|
|
||||||
consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, accountDB, queryAPI)
|
consumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, accountDB, queryAPI)
|
||||||
if err = consumer.Start(); err != nil {
|
if err = consumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer")
|
log.Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ func main() {
|
||||||
|
|
||||||
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
|
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
|
||||||
|
|
||||||
consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, queues, db, queryAPI)
|
consumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, queues, db, queryAPI)
|
||||||
if err = consumer.Start(); err != nil {
|
if err = consumer.Start(); err != nil {
|
||||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,28 +285,28 @@ func (m *monolith) setupNotifiers() {
|
||||||
func (m *monolith) setupConsumers() {
|
func (m *monolith) setupConsumers() {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent(
|
clientAPIConsumer := clientapi_consumers.NewOutputRoomEventConsumer(
|
||||||
m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI,
|
m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = clientAPIConsumer.Start(); err != nil {
|
if err = clientAPIConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
|
syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEventConsumer(
|
||||||
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = syncAPIRoomConsumer.Start(); err != nil {
|
if err = syncAPIRoomConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
|
syncAPIClientConsumer := syncapi_consumers.NewOutputClientDataConsumer(
|
||||||
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB,
|
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB,
|
||||||
)
|
)
|
||||||
if err = syncAPIClientConsumer.Start(); err != nil {
|
if err = syncAPIClientConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent(
|
publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEventConsumer(
|
||||||
m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI,
|
m.cfg, m.kafkaConsumer(), m.publicRoomsAPIDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = publicRoomsAPIConsumer.Start(); err != nil {
|
if err = publicRoomsAPIConsumer.Start(); err != nil {
|
||||||
|
@ -315,7 +315,7 @@ func (m *monolith) setupConsumers() {
|
||||||
|
|
||||||
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
||||||
|
|
||||||
federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
|
federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEventConsumer(
|
||||||
m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = federationSenderRoomConsumer.Start(); err != nil {
|
if err = federationSenderRoomConsumer.Start(); err != nil {
|
||||||
|
|
|
@ -73,7 +73,7 @@ func main() {
|
||||||
}).Panic("Failed to setup kafka consumers")
|
}).Panic("Failed to setup kafka consumers")
|
||||||
}
|
}
|
||||||
|
|
||||||
roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db, queryAPI)
|
roomConsumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, db, queryAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create room server consumer: %s", err)
|
log.Panicf("startup: failed to create room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,11 +92,11 @@ func main() {
|
||||||
}).Panic("Failed to setup kafka consumers")
|
}).Panic("Failed to setup kafka consumers")
|
||||||
}
|
}
|
||||||
|
|
||||||
roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, n, db, queryAPI)
|
roomConsumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, n, db, queryAPI)
|
||||||
if err = roomConsumer.Start(); err != nil {
|
if err = roomConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
clientConsumer := consumers.NewOutputClientData(cfg, kafkaConsumer, n, db)
|
clientConsumer := consumers.NewOutputClientDataConsumer(cfg, kafkaConsumer, n, db)
|
||||||
if err = clientConsumer.Start(); err != nil {
|
if err = clientConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,28 +30,28 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputRoomEvent consumes events that originated in the room server.
|
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||||
type OutputRoomEvent struct {
|
type OutputRoomEventConsumer struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *storage.Database
|
db *storage.Database
|
||||||
queues *queue.OutgoingQueues
|
queues *queue.OutgoingQueues
|
||||||
query api.RoomserverQueryAPI
|
query api.RoomserverQueryAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(
|
func NewOutputRoomEventConsumer(
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
queues *queue.OutgoingQueues,
|
queues *queue.OutgoingQueues,
|
||||||
store *storage.Database,
|
store *storage.Database,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
) *OutputRoomEvent {
|
) *OutputRoomEventConsumer {
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &OutputRoomEvent{
|
s := &OutputRoomEventConsumer{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
queues: queues,
|
queues: queues,
|
||||||
|
@ -63,7 +63,7 @@ func NewOutputRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEvent) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
return s.roomServerConsumer.Start()
|
return s.roomServerConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +71,7 @@ func (s *OutputRoomEvent) Start() error {
|
||||||
// It is unsafe to call this with messages for the same room in multiple gorountines
|
// It is unsafe to call this with messages for the same room in multiple gorountines
|
||||||
// because updates it will likely fail with a types.EventIDMismatchError when it
|
// because updates it will likely fail with a types.EventIDMismatchError when it
|
||||||
// realises that it cannot update the room state using the deltas.
|
// realises that it cannot update the room state using the deltas.
|
||||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
@ -108,7 +108,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
|
|
||||||
// processMessage updates the list of currently joined hosts in the room
|
// processMessage updates the list of currently joined hosts in the room
|
||||||
// and then sends the event to the hosts that were joined before the event.
|
// and then sends the event to the hosts that were joined before the event.
|
||||||
func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error {
|
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||||
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event)
|
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -164,7 +164,7 @@ func (s *OutputRoomEvent) processMessage(ore api.OutputNewRoomEvent) error {
|
||||||
// Usually the list can be calculated locally, but sometimes it will need fetch
|
// Usually the list can be calculated locally, but sometimes it will need fetch
|
||||||
// events from the room server.
|
// events from the room server.
|
||||||
// Returns an error if there was a problem talking to the room server.
|
// Returns an error if there was a problem talking to the room server.
|
||||||
func (s *OutputRoomEvent) joinedHostsAtEvent(
|
func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
|
||||||
ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
|
ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
|
||||||
) ([]gomatrixserverlib.ServerName, error) {
|
) ([]gomatrixserverlib.ServerName, error) {
|
||||||
// Combine the delta into a single delta so that the adds and removes can
|
// Combine the delta into a single delta so that the adds and removes can
|
||||||
|
@ -283,7 +283,7 @@ func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []s
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookupStateEvents looks up the state events that are added by a new event.
|
// lookupStateEvents looks up the state events that are added by a new event.
|
||||||
func (s *OutputRoomEvent) lookupStateEvents(
|
func (s *OutputRoomEventConsumer) lookupStateEvents(
|
||||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
// Fast path if there aren't any new state events.
|
// Fast path if there aren't any new state events.
|
||||||
|
|
|
@ -26,26 +26,26 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputRoomEvent consumes events that originated in the room server.
|
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||||
type OutputRoomEvent struct {
|
type OutputRoomEventConsumer struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *storage.PublicRoomsServerDatabase
|
db *storage.PublicRoomsServerDatabase
|
||||||
query api.RoomserverQueryAPI
|
query api.RoomserverQueryAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(
|
func NewOutputRoomEventConsumer(
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
store *storage.PublicRoomsServerDatabase,
|
store *storage.PublicRoomsServerDatabase,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
) *OutputRoomEvent {
|
) *OutputRoomEventConsumer {
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &OutputRoomEvent{
|
s := &OutputRoomEventConsumer{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
query: queryAPI,
|
query: queryAPI,
|
||||||
|
@ -56,12 +56,12 @@ func NewOutputRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEvent) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
return s.roomServerConsumer.Start()
|
return s.roomServerConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
|
|
@ -26,27 +26,27 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputClientData consumes events that originated in the client API server.
|
// OutputClientDataConsumer consumes events that originated in the client API server.
|
||||||
type OutputClientData struct {
|
type OutputClientDataConsumer struct {
|
||||||
clientAPIConsumer *common.ContinualConsumer
|
clientAPIConsumer *common.ContinualConsumer
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputClientData(
|
func NewOutputClientDataConsumer(
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *sync.Notifier,
|
n *sync.Notifier,
|
||||||
store *storage.SyncServerDatabase,
|
store *storage.SyncServerDatabase,
|
||||||
) *OutputClientData {
|
) *OutputClientDataConsumer {
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
Topic: string(cfg.Kafka.Topics.OutputClientData),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &OutputClientData{
|
s := &OutputClientDataConsumer{
|
||||||
clientAPIConsumer: &consumer,
|
clientAPIConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
|
@ -57,14 +57,14 @@ func NewOutputClientData(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputClientData) Start() error {
|
func (s *OutputClientDataConsumer) Start() error {
|
||||||
return s.clientAPIConsumer.Start()
|
return s.clientAPIConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the client API server output log.
|
// onMessage is called when the sync server receives a new event from the client API server output log.
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
// sync stream position may race and be incorrectly calculated.
|
// sync stream position may race and be incorrectly calculated.
|
||||||
func (s *OutputClientData) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output common.AccountData
|
var output common.AccountData
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
|
|
@ -30,29 +30,29 @@ import (
|
||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OutputRoomEvent consumes events that originated in the room server.
|
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||||
type OutputRoomEvent struct {
|
type OutputRoomEventConsumer struct {
|
||||||
roomServerConsumer *common.ContinualConsumer
|
roomServerConsumer *common.ContinualConsumer
|
||||||
db *storage.SyncServerDatabase
|
db *storage.SyncServerDatabase
|
||||||
notifier *sync.Notifier
|
notifier *sync.Notifier
|
||||||
query api.RoomserverQueryAPI
|
query api.RoomserverQueryAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(
|
func NewOutputRoomEventConsumer(
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *sync.Notifier,
|
n *sync.Notifier,
|
||||||
store *storage.SyncServerDatabase,
|
store *storage.SyncServerDatabase,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
) *OutputRoomEvent {
|
) *OutputRoomEventConsumer {
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &OutputRoomEvent{
|
s := &OutputRoomEventConsumer{
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
|
@ -64,14 +64,14 @@ func NewOutputRoomEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEvent) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
return s.roomServerConsumer.Start()
|
return s.roomServerConsumer.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||||
// sync stream position may race and be incorrectly calculated.
|
// sync stream position may race and be incorrectly calculated.
|
||||||
func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// Parse out the event JSON
|
// Parse out the event JSON
|
||||||
var output api.OutputEvent
|
var output api.OutputEvent
|
||||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||||
|
@ -95,7 +95,7 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEvent) onNewRoomEvent(
|
func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
ctx context.Context, msg api.OutputNewRoomEvent,
|
ctx context.Context, msg api.OutputNewRoomEvent,
|
||||||
) error {
|
) error {
|
||||||
ev := msg.Event
|
ev := msg.Event
|
||||||
|
@ -152,7 +152,7 @@ func (s *OutputRoomEvent) onNewRoomEvent(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEvent) onNewInviteEvent(
|
func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
||||||
ctx context.Context, msg api.OutputNewInviteEvent,
|
ctx context.Context, msg api.OutputNewInviteEvent,
|
||||||
) error {
|
) error {
|
||||||
syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
||||||
|
@ -168,7 +168,7 @@ func (s *OutputRoomEvent) onNewInviteEvent(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEvent) onRetireInviteEvent(
|
func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
||||||
ctx context.Context, msg api.OutputRetireInviteEvent,
|
ctx context.Context, msg api.OutputRetireInviteEvent,
|
||||||
) error {
|
) error {
|
||||||
err := s.db.RetireInviteEvent(ctx, msg.EventID)
|
err := s.db.RetireInviteEvent(ctx, msg.EventID)
|
||||||
|
@ -186,7 +186,7 @@ func (s *OutputRoomEvent) onRetireInviteEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
// lookupStateEvents looks up the state events that are added by a new event.
|
// lookupStateEvents looks up the state events that are added by a new event.
|
||||||
func (s *OutputRoomEvent) lookupStateEvents(
|
func (s *OutputRoomEventConsumer) lookupStateEvents(
|
||||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
// Fast path if there aren't any new state events.
|
// Fast path if there aren't any new state events.
|
||||||
|
@ -242,7 +242,7 @@ func (s *OutputRoomEvent) lookupStateEvents(
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEvent) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
|
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.Event) (gomatrixserverlib.Event, error) {
|
||||||
var stateKey string
|
var stateKey string
|
||||||
if event.StateKey() == nil {
|
if event.StateKey() == nil {
|
||||||
stateKey = ""
|
stateKey = ""
|
||||||
|
|
Loading…
Reference in a new issue