From b91b3e729a879f200be6b555e4922e4c916b87d2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 19 Sep 2017 17:15:46 +0100 Subject: [PATCH] Use a shared PostgreSQL sequence to generate ids. (#237) * Use a shared PostgreSQL sequence to generate ids. Share an auto incrementing sequnce between the account data and the room event table. This means that account data updates can be received independantly of room events updates. This should give some basic support for fixing #212 * Remove redundant 'primary key' * Re-number the SQL arguments * Fewer lies in comments --- .../syncapi/storage/account_data_table.go | 43 ++++++++++++++----- .../storage/output_room_events_table.go | 15 ++++--- .../dendrite/syncapi/storage/syncserver.go | 31 +++++++------ 3 files changed, 61 insertions(+), 28 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go index 4a23d6977..72aaf3551 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go @@ -18,14 +18,20 @@ import ( "context" "database/sql" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/types" ) const accountDataSchema = ` --- Stores the users account data +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + +-- Stores the types of account data that a user set has globally and in each room +-- and the stream ID when that type was last updated. CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( - -- The highest numeric ID from the output_room_events at the time of saving the data - id BIGINT, + -- An incrementing ID which denotes the position in the log that this event resides at. + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- ID of the user the data belongs to user_id TEXT NOT NULL, -- ID of the room the data is related to (empty string if not related to a specific room) @@ -33,8 +39,6 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( -- Type of the data type TEXT NOT NULL, - PRIMARY KEY(user_id, room_id, type), - -- We don't want two entries of the same type for the same user CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type) ); @@ -43,18 +47,23 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account ` const insertAccountDataSQL = "" + - "INSERT INTO syncapi_account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" + + "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" + " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" + - " DO UPDATE SET id = EXCLUDED.id" + " DO UPDATE SET id = EXCLUDED.id" + + " RETURNING id" const selectAccountDataInRangeSQL = "" + "SELECT room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id ASC" +const selectMaxAccountDataIDSQL = "" + + "SELECT MAX(id) FROM syncapi_account_data_type" + type accountDataStatements struct { insertAccountDataStmt *sql.Stmt selectAccountDataInRangeStmt *sql.Stmt + selectMaxAccountDataIDStmt *sql.Stmt } func (s *accountDataStatements) prepare(db *sql.DB) (err error) { @@ -68,15 +77,17 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) { if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { return } + if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { + return + } return } func (s *accountDataStatements) insertAccountData( ctx context.Context, - pos types.StreamPosition, userID, roomID, dataType string, -) (err error) { - _, err = s.insertAccountDataStmt.ExecContext(ctx, pos, userID, roomID, dataType) +) (pos int64, err error) { + s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos) return } @@ -116,3 +127,15 @@ func (s *accountDataStatements) selectAccountDataInRange( return } + +func (s *accountDataStatements) selectMaxAccountDataID( + ctx context.Context, txn *sql.Tx, +) (id int64, err error) { + var nullableID sql.NullInt64 + stmt := common.TxStmt(txn, s.selectMaxAccountDataIDStmt) + err = stmt.QueryRowContext(ctx).Scan(&nullableID) + if nullableID.Valid { + id = nullableID.Int64 + } + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 7ae24990a..660a3074d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -26,12 +26,15 @@ import ( ) const outputRoomEventsSchema = ` +-- This sequence is shared between all the tables generated from kafka logs. +CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; + -- Stores output room events received from the roomserver. CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- An incrementing ID which denotes the position in the log that this event resides at. -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- This isn't a problem for us since we just want to order by this field. - id BIGSERIAL PRIMARY KEY, + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- The event ID for the event event_id TEXT NOT NULL, -- The 'room_id' key for the event. @@ -60,7 +63,7 @@ const selectRecentEventsSQL = "" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" -const selectMaxIDSQL = "" + +const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). @@ -73,7 +76,7 @@ const selectStateInRangeSQL = "" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt - selectMaxIDStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt } @@ -89,7 +92,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { return } - if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil { + if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { return } if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { @@ -170,11 +173,11 @@ func (s *outputRoomEventsStatements) selectStateInRange( // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. -func (s *outputRoomEventsStatements) selectMaxID( +func (s *outputRoomEventsStatements) selectMaxEventID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 - stmt := common.TxStmt(txn, s.selectMaxIDStmt) + stmt := common.TxStmt(txn, s.selectMaxEventIDStmt) err = stmt.QueryRowContext(ctx).Scan(&nullableID) if nullableID.Valid { id = nullableID.Int64 diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 6594b938c..a26b14c0e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -174,11 +174,24 @@ func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, o // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { - id, err := d.events.selectMaxID(ctx, nil) + return d.syncStreamPositionTx(ctx, nil) +} + +func (d *SyncServerDatabase) syncStreamPositionTx( + ctx context.Context, txn *sql.Tx, +) (types.StreamPosition, error) { + maxID, err := d.events.selectMaxEventID(ctx, txn) if err != nil { - return types.StreamPosition(0), err + return 0, err } - return types.StreamPosition(id), nil + maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + if err != nil { + return 0, err + } + if maxAccountDataID > maxID { + maxID = maxAccountDataID + } + return types.StreamPosition(maxID), nil } // IncrementalSync returns all the data needed in order to create an incremental sync response. @@ -271,11 +284,10 @@ func (d *SyncServerDatabase) CompleteSync( defer common.EndTransaction(txn, &succeeded) // Get the current stream position which we will base the sync response on. - id, err := d.events.selectMaxID(ctx, txn) + pos, err := d.syncStreamPositionTx(ctx, txn) if err != nil { return nil, err } - pos := types.StreamPosition(id) // Extract room state and recent events for all rooms the user is joined to. roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") @@ -348,13 +360,8 @@ func (d *SyncServerDatabase) GetAccountDataInRange( func (d *SyncServerDatabase) UpsertAccountData( ctx context.Context, userID, roomID, dataType string, ) (types.StreamPosition, error) { - pos, err := d.SyncStreamPosition(ctx) - if err != nil { - return pos, err - } - - err = d.accountData.insertAccountData(ctx, pos, userID, roomID, dataType) - return pos, err + pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType) + return types.StreamPosition(pos), err } func (d *SyncServerDatabase) addInvitesToResponse(