diff --git a/syncapi/storage/sqlite3/deltas/20201017140347_create_receipt_table.go b/syncapi/storage/sqlite3/deltas/20201017140347_create_receipt_table.go new file mode 100644 index 000000000..a6ac06840 --- /dev/null +++ b/syncapi/storage/sqlite3/deltas/20201017140347_create_receipt_table.go @@ -0,0 +1,46 @@ +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable) +} + +func LoadCreateReceiptTable(m *sqlutil.Migrations) { + m.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable) +} + +func UpCreateReceiptTable(tx *sql.Tx) error { + _, err := tx.Exec(` +-- Stores data about receipts +CREATE TABLE IF NOT EXISTS syncapi_receipts ( + -- The ID + id BIGINT, + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + receipt_ts BIGINT NOT NULL, + CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id_idx ON syncapi_receipts(room_id); +`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownCreateReceiptTable(tx *sql.Tx) error { + _, err := tx.Exec("DROP TABLE IF EXISTS syncapi_receipts;") + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go index b1770e801..4449891ba 100644 --- a/syncapi/storage/sqlite3/receipt_table.go +++ b/syncapi/storage/sqlite3/receipt_table.go @@ -63,10 +63,7 @@ type receiptStatements struct { } func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) { - _, err := db.Exec(receiptsSchema) - if err != nil { - return nil, err - } + var err error r := &receiptStatements{ db: db, streamIDStatements: streamID, @@ -80,6 +77,11 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Re return r, nil } +func (r *receiptStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(receiptsSchema) + return err +} + // UpsertReceipt creates new user receipts func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { pos, err = r.streamIDStatements.nextStreamID(ctx, txn) diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 036e2b2e5..80d844e3a 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" ) // SyncServerDatasource represents a sync server datasource which manages @@ -46,6 +47,18 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e return nil, err } d.writer = sqlutil.NewExclusiveWriter() + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + r := receiptStatements{} + if err = r.execSchema(d.db); err != nil { + return nil, err + } + m := sqlutil.NewMigrations() + deltas.LoadCreateReceiptTable(m) + if err = m.RunDeltas(d.db, dbProperties); err != nil { + return nil, err + } if err = d.prepare(); err != nil { return nil, err }