diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-roomserver/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-roomserver/main.go index 920b1a34a..25972cf35 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-roomserver/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-roomserver/main.go @@ -2,16 +2,18 @@ package main import ( "fmt" - "github.com/matrix-org/dendrite/roomserver/input" - "github.com/matrix-org/dendrite/roomserver/query" - "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/prometheus/client_golang/prometheus" - sarama "gopkg.in/Shopify/sarama.v1" "net/http" _ "net/http/pprof" "os" "strconv" "strings" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/input" + "github.com/matrix-org/dendrite/roomserver/query" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/prometheus/client_golang/prometheus" + sarama "gopkg.in/Shopify/sarama.v1" ) var ( @@ -43,10 +45,13 @@ func main() { } consumer := input.Consumer{ - Consumer: kafkaConsumer, + ContinualConsumer: common.ContinualConsumer{ + Topic: inputRoomEventTopic, + Consumer: kafkaConsumer, + PartitionStore: db, + }, DB: db, Producer: kafkaProducer, - InputRoomEventTopic: inputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic, } diff --git a/src/github.com/matrix-org/dendrite/common/consumers.go b/src/github.com/matrix-org/dendrite/common/consumers.go new file mode 100644 index 000000000..891e080b2 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/consumers.go @@ -0,0 +1,108 @@ +package common + +import ( + "fmt" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// A PartitionOffset is the offset into a partition of the input log. +type PartitionOffset struct { + // The ID of the partition. + Partition int32 + // The offset into the partition. + Offset int64 +} + +// A PartitionStorer has the storage APIs needed by the consumer. +type PartitionStorer interface { + // PartitionOffsets returns the offsets the consumer has reached for each partition. + PartitionOffsets(topic string) ([]PartitionOffset, error) + // SetPartitionOffset records where the consumer has reached for a partition. + SetPartitionOffset(topic string, partition int32, offset int64) error +} + +// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to +// remember the offset it reached. +type ContinualConsumer struct { + // The kafkaesque topic to consume events from. + // This is the name used in kafka to identify the stream to consume events from. + Topic string + // A kafkaesque stream consumer providing the APIs for talking to the event source. + // The interface is taken from a client library for Apache Kafka. + // But any equivalent event streaming protocol could be made to implement the same interface. + Consumer sarama.Consumer + // A thing which can load and save partition offsets for a topic. + PartitionStore PartitionStorer + // ProcessMessage is a function which will be called for each message in the log. Return an error to + // stop processing messages. See ErrShutdown for specific control signals. + ProcessMessage func(msg *sarama.ConsumerMessage) error + // ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved. + // It is optional. + ShutdownCallback func() +} + +// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer. +var ErrShutdown = fmt.Errorf("shutdown") + +// Start starts the consumer consuming. +// Starts up a goroutine for each partition in the kafka stream. +// Returns nil once all the goroutines are started. +// Returns an error if it can't start consuming for any of the partitions. +func (c *ContinualConsumer) Start() error { + offsets := map[int32]int64{} + + partitions, err := c.Consumer.Partitions(c.Topic) + if err != nil { + return err + } + for _, partition := range partitions { + // Default all the offsets to the beginning of the stream. + offsets[partition] = sarama.OffsetOldest + } + + storedOffsets, err := c.PartitionStore.PartitionOffsets(c.Topic) + if err != nil { + return err + } + for _, offset := range storedOffsets { + // We've already processed events from this partition so advance the offset to where we got to. + offsets[offset.Partition] = offset.Offset + } + + var partitionConsumers []sarama.PartitionConsumer + for partition, offset := range offsets { + pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset) + if err != nil { + for _, p := range partitionConsumers { + p.Close() + } + return err + } + partitionConsumers = append(partitionConsumers, pc) + } + for _, pc := range partitionConsumers { + go c.consumePartition(pc) + } + + return nil +} + +// consumePartition consumes the room events for a single partition of the kafkaesque stream. +func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) { + defer pc.Close() + for message := range pc.Messages() { + msgErr := c.ProcessMessage(message) + // Advance our position in the stream so that we will start at the right position after a restart. + if err := c.PartitionStore.SetPartitionOffset(c.Topic, message.Partition, message.Offset); err != nil { + panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err)) + } + // Shutdown if we were told to do so. + if msgErr == ErrShutdown { + if c.ShutdownCallback != nil { + c.ShutdownCallback() + } + return + } + } +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/partition_offset_table.go b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go similarity index 57% rename from src/github.com/matrix-org/dendrite/roomserver/storage/partition_offset_table.go rename to src/github.com/matrix-org/dendrite/common/partition_offset_table.go index a926f8777..904e0be81 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/partition_offset_table.go +++ b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go @@ -1,9 +1,6 @@ -package storage +package common -import ( - "database/sql" - "github.com/matrix-org/dendrite/roomserver/types" -) +import "database/sql" const partitionOffsetsSchema = ` -- The offsets that the server has processed up to. @@ -26,32 +23,37 @@ const upsertPartitionOffsetsSQL = "" + " ON CONFLICT ON CONSTRAINT topic_partition_unique" + " DO UPDATE SET partition_offset = $3" -type partitionOffsetStatements struct { +// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. +type PartitionOffsetStatements struct { selectPartitionOffsetsStmt *sql.Stmt upsertPartitionOffsetStmt *sql.Stmt } -func (s *partitionOffsetStatements) prepare(db *sql.DB) (err error) { +// Prepare converts the raw SQL statements into prepared statements. +func (s *PartitionOffsetStatements) Prepare(db *sql.DB) (err error) { _, err = db.Exec(partitionOffsetsSchema) if err != nil { return } - - return statementList{ - {&s.selectPartitionOffsetsStmt, selectPartitionOffsetsSQL}, - {&s.upsertPartitionOffsetStmt, upsertPartitionOffsetsSQL}, - }.prepare(db) + if s.selectPartitionOffsetsStmt, err = db.Prepare(selectPartitionOffsetsSQL); err != nil { + return + } + if s.upsertPartitionOffsetStmt, err = db.Prepare(upsertPartitionOffsetsSQL); err != nil { + return + } + return } -func (s *partitionOffsetStatements) selectPartitionOffsets(topic string) ([]types.PartitionOffset, error) { +// SelectPartitionOffsets returns all the partition offsets for the given topic. +func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]PartitionOffset, error) { rows, err := s.selectPartitionOffsetsStmt.Query(topic) if err != nil { return nil, err } defer rows.Close() - var results []types.PartitionOffset + var results []PartitionOffset for rows.Next() { - var offset types.PartitionOffset + var offset PartitionOffset if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { return nil, err } @@ -59,7 +61,8 @@ func (s *partitionOffsetStatements) selectPartitionOffsets(topic string) ([]type return results, nil } -func (s *partitionOffsetStatements) upsertPartitionOffset(topic string, partition int32, offset int64) error { +// UpsertPartitionOffset updates or inserts the partition offset for the given topic. +func (s *PartitionOffsetStatements) UpsertPartitionOffset(topic string, partition int32, offset int64) error { _, err := s.upsertPartitionOffsetStmt.Exec(topic, partition, offset) return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go index b433d707f..f2b065b67 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go @@ -4,19 +4,17 @@ package input import ( "encoding/json" "fmt" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/roomserver/types" - sarama "gopkg.in/Shopify/sarama.v1" "sync/atomic" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + sarama "gopkg.in/Shopify/sarama.v1" ) // A ConsumerDatabase has the storage APIs needed by the consumer. type ConsumerDatabase interface { RoomEventDatabase - // PartitionOffsets returns the offsets the consumer has reached for each partition. - PartitionOffsets(topic string) ([]types.PartitionOffset, error) - // SetPartitionOffset records where the consumer has reached for a partition. - SetPartitionOffset(topic string, partition int32, offset int64) error + common.PartitionStorer } // An ErrorLogger handles the errors encountered by the consumer. @@ -31,16 +29,10 @@ type ErrorLogger interface { // The events needed to construct the state at the event should already be stored on the roomserver. // If the event is not valid then it will be discarded and an error will be logged. type Consumer struct { - // A kafkaesque stream consumer providing the APIs for talking to the event source. - // The interface is taken from a client library for Apache Kafka. - // But any equivalent event streaming protocol could be made to implement the same interface. - Consumer sarama.Consumer + ContinualConsumer common.ContinualConsumer // The database used to store the room events. DB ConsumerDatabase Producer sarama.SyncProducer - // The kafkaesque topic to consume room events from. - // This is the name used in kafka to identify the stream to consume events from. - InputRoomEventTopic string // The kafkaesque topic to output new room events to. // This is the name used in kafka to identify the stream to write events to. OutputRoomEventTopic string @@ -75,79 +67,42 @@ func (c *Consumer) WriteOutputRoomEvent(output api.OutputRoomEvent) error { // Returns nil once all the goroutines are started. // Returns an error if it can't start consuming for any of the partitions. func (c *Consumer) Start() error { - offsets := map[int32]int64{} + c.ContinualConsumer.ProcessMessage = c.processMessage + c.ContinualConsumer.ShutdownCallback = c.shutdown + return c.ContinualConsumer.Start() +} - partitions, err := c.Consumer.Partitions(c.InputRoomEventTopic) - if err != nil { - return err - } - for _, partition := range partitions { - // Default all the offsets to the beginning of the stream. - offsets[partition] = sarama.OffsetOldest - } - - storedOffsets, err := c.DB.PartitionOffsets(c.InputRoomEventTopic) - if err != nil { - return err - } - for _, offset := range storedOffsets { - // We've already processed events from this partition so advance the offset to where we got to. - offsets[offset.Partition] = offset.Offset - } - - var partitionConsumers []sarama.PartitionConsumer - for partition, offset := range offsets { - pc, err := c.Consumer.ConsumePartition(c.InputRoomEventTopic, partition, offset) - if err != nil { - for _, p := range partitionConsumers { - p.Close() - } - return err +func (c *Consumer) processMessage(message *sarama.ConsumerMessage) error { + var input api.InputRoomEvent + if err := json.Unmarshal(message.Value, &input); err != nil { + // If the message is invalid then log it and move onto the next message in the stream. + c.logError(message, err) + } else { + if err := processRoomEvent(c.DB, c, input); err != nil { + // If there was an error processing the message then log it and + // move onto the next message in the stream. + // TODO: If the error was due to a problem talking to the database + // then we shouldn't move onto the next message and we should either + // retry processing the message, or panic and kill ourselves. + c.logError(message, err) } - partitionConsumers = append(partitionConsumers, pc) } - for _, pc := range partitionConsumers { - go c.consumePartition(pc) + // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. + processed := atomic.AddInt64(&c.processed, 1) + // Check if we should stop processing. + // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. + // If we try to stop processing after M message and we have N goroutines then we will process somewhere + // between M and (N + M) messages because the N goroutines could all try to process what they think will be the + // last message. We could be more careful here but this is good enough for getting rough benchmarks. + if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) { + return common.ErrShutdown } - return nil } -// consumePartition consumes the room events for a single partition of the kafkaesque stream. -func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) { - defer pc.Close() - for message := range pc.Messages() { - var input api.InputRoomEvent - if err := json.Unmarshal(message.Value, &input); err != nil { - // If the message is invalid then log it and move onto the next message in the stream. - c.logError(message, err) - } else { - if err := processRoomEvent(c.DB, c, input); err != nil { - // If there was an error processing the message then log it and - // move onto the next message in the stream. - // TODO: If the error was due to a problem talking to the database - // then we shouldn't move onto the next message and we should either - // retry processing the message, or panic and kill ourselves. - c.logError(message, err) - } - } - // Advance our position in the stream so that we will start at the right position after a restart. - if err := c.DB.SetPartitionOffset(c.InputRoomEventTopic, message.Partition, message.Offset); err != nil { - c.logError(message, err) - } - // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. - processed := atomic.AddInt64(&c.processed, 1) - // Check if we should stop processing. - // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. - // If we try to stop processing after M message and we have N goroutines then we will process somewhere - // between M and (N + M) messages because the N goroutines could all try to process what they think will be the - // last message. We could be more careful here but this is good enough for getting rough benchmarks. - if c.StopProcessingAfter != nil && processed >= int64(*c.StopProcessingAfter) { - if c.ShutdownCallback != nil { - c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed)) - } - return - } +func (c *Consumer) shutdown() { + if c.ShutdownCallback != nil { + c.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", c.processed)) } } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index 31b48fd98..f52d421f7 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -2,10 +2,12 @@ package storage import ( "database/sql" + + "github.com/matrix-org/dendrite/common" ) type statements struct { - partitionOffsetStatements + common.PartitionOffsetStatements eventTypeStatements eventStateKeyStatements roomStatements @@ -19,7 +21,7 @@ type statements struct { func (s *statements) prepare(db *sql.DB) error { var err error - if err = s.partitionOffsetStatements.prepare(db); err != nil { + if err = s.PartitionOffsetStatements.Prepare(db); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index d75c8d380..14874121a 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -4,6 +4,7 @@ import ( "database/sql" // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -28,13 +29,13 @@ func Open(dataSourceName string) (*Database, error) { } // PartitionOffsets implements input.ConsumerDatabase -func (d *Database) PartitionOffsets(topic string) ([]types.PartitionOffset, error) { - return d.statements.selectPartitionOffsets(topic) +func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.statements.SelectPartitionOffsets(topic) } // SetPartitionOffset implements input.ConsumerDatabase func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { - return d.statements.upsertPartitionOffset(topic, partition, offset) + return d.statements.UpsertPartitionOffset(topic, partition, offset) } // StoreEvent implements input.EventDatabase diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index 1d268e1f4..a921a0a74 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -5,14 +5,6 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -// A PartitionOffset is the offset into a partition of the input log. -type PartitionOffset struct { - // The ID of the partition. - Partition int32 - // The offset into the partition. - Offset int64 -} - // EventTypeNID is a numeric ID for an event type. type EventTypeNID int64