diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 4ffd29610..2ccf0be1a 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -104,12 +104,7 @@ const selectStateEventSQL = "" + "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" const selectEventsWithEventIDsSQL = "" + - // TODO: The session_id and transaction_id blanks are here because - // the rowsToStreamEvents expects there to be exactly seven columns. We need to - // figure out if these really need to be in the DB, and if so, we need a - // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" + - " FROM syncapi_current_room_state WHERE event_id = ANY($1)" + "SELECT event_id, added_at, headered_event_json, history_visibility FROM syncapi_current_room_state WHERE event_id = ANY($1)" const selectSharedUsersSQL = "" + "SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" + @@ -365,7 +360,36 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed") - return rowsToStreamEvents(rows) + return currentRoomStateRowsToStreamEvents(rows) +} + +func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { + var events []types.StreamEvent + for rows.Next() { + var ( + eventID string + streamPos types.StreamPosition + eventBytes []byte + historyVisibility gomatrixserverlib.HistoryVisibility + ) + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &historyVisibility); err != nil { + return nil, err + } + // TODO: Handle redacted events + var ev gomatrixserverlib.HeaderedEvent + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + return nil, err + } + + ev.Visibility = historyVisibility + + events = append(events, types.StreamEvent{ + HeaderedEvent: &ev, + StreamPosition: streamPos, + }) + } + + return events, nil } func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index c4019fed2..ff45e786e 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -88,12 +88,7 @@ const selectStateEventSQL = "" + "SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3" const selectEventsWithEventIDsSQL = "" + - // TODO: The session_id and transaction_id blanks are here because - // the rowsToStreamEvents expects there to be exactly seven columns. We need to - // figure out if these really need to be in the DB, and if so, we need a - // better permanent fix for this. - neilalexander, 2 Jan 2020 - "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" + - " FROM syncapi_current_room_state WHERE event_id IN ($1)" + "SELECT event_id, added_at, headered_event_json, history_visibility FROM syncapi_current_room_state WHERE event_id IN ($1)" const selectSharedUsersSQL = "" + "SELECT state_key FROM syncapi_current_room_state WHERE room_id IN(" + @@ -378,7 +373,7 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( return nil, err } start = start + n - events, err := rowsToStreamEvents(rows) + events, err := currentRoomStateRowsToStreamEvents(rows) internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed") if err != nil { return nil, err @@ -388,6 +383,35 @@ func (s *currentRoomStateStatements) SelectEventsWithEventIDs( return res, nil } +func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { + var events []types.StreamEvent + for rows.Next() { + var ( + eventID string + streamPos types.StreamPosition + eventBytes []byte + historyVisibility gomatrixserverlib.HistoryVisibility + ) + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &historyVisibility); err != nil { + return nil, err + } + // TODO: Handle redacted events + var ev gomatrixserverlib.HeaderedEvent + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { + return nil, err + } + + ev.Visibility = historyVisibility + + events = append(events, types.StreamEvent{ + HeaderedEvent: &ev, + StreamPosition: streamPos, + }) + } + + return events, nil +} + func rowsToEvents(rows *sql.Rows) ([]*gomatrixserverlib.HeaderedEvent, error) { result := []*gomatrixserverlib.HeaderedEvent{} for rows.Next() { diff --git a/syncapi/storage/tables/current_room_state_test.go b/syncapi/storage/tables/current_room_state_test.go new file mode 100644 index 000000000..23287c500 --- /dev/null +++ b/syncapi/storage/tables/current_room_state_test.go @@ -0,0 +1,88 @@ +package tables_test + +import ( + "context" + "database/sql" + "fmt" + "testing" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/syncapi/storage/postgres" + "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/dendrite/test" +) + +func newCurrentRoomStateTable(t *testing.T, dbType test.DBType) (tables.CurrentRoomState, *sql.DB, func()) { + t.Helper() + connStr, close := test.PrepareDBConnectionString(t, dbType) + db, err := sqlutil.Open(&config.DatabaseOptions{ + ConnectionString: config.DataSource(connStr), + }, sqlutil.NewExclusiveWriter()) + if err != nil { + t.Fatalf("failed to open db: %s", err) + } + + var tab tables.CurrentRoomState + switch dbType { + case test.DBTypePostgres: + tab, err = postgres.NewPostgresCurrentRoomStateTable(db) + case test.DBTypeSQLite: + var stream sqlite3.StreamIDStatements + if err = stream.Prepare(db); err != nil { + t.Fatalf("failed to prepare stream stmts: %s", err) + } + tab, err = sqlite3.NewSqliteCurrentRoomStateTable(db, &stream) + } + if err != nil { + t.Fatalf("failed to make new table: %s", err) + } + return tab, db, close +} + +func TestCurrentRoomStateTable(t *testing.T) { + ctx := context.Background() + alice := test.NewUser(t) + room := test.NewRoom(t, alice) + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + tab, db, close := newCurrentRoomStateTable(t, dbType) + defer close() + events := room.CurrentState() + err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error { + for i, ev := range events { + err := tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i)) + if err != nil { + return fmt.Errorf("failed to UpsertRoomState: %w", err) + } + } + wantEventIDs := []string{ + events[0].EventID(), events[1].EventID(), events[2].EventID(), events[3].EventID(), + } + gotEvents, err := tab.SelectEventsWithEventIDs(ctx, txn, wantEventIDs) + if err != nil { + return fmt.Errorf("failed to SelectEventsWithEventIDs: %w", err) + } + if len(gotEvents) != len(wantEventIDs) { + return fmt.Errorf("SelectEventsWithEventIDs\ngot %d, want %d results", len(gotEvents), len(wantEventIDs)) + } + gotEventIDs := make(map[string]struct{}, len(gotEvents)) + for _, event := range gotEvents { + if event.ExcludeFromSync { + return fmt.Errorf("SelectEventsWithEventIDs ExcludeFromSync should be false for current room state event %+v", event) + } + gotEventIDs[event.EventID()] = struct{}{} + } + for _, id := range wantEventIDs { + if _, ok := gotEventIDs[id]; !ok { + return fmt.Errorf("SelectEventsWithEventIDs\nexpected id %q not returned", id) + } + } + return nil + }) + if err != nil { + t.Fatalf("err: %v", err) + } + }) +}