diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 49328edf4..e146d5734 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -34,6 +34,10 @@ type RoomEventDatabase interface { AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) // Set the state at an event. SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error + // Lookup the latest events in a room in preparation for an update. + // The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error. + // If this returns an error then no further action is required. + GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) } func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { @@ -85,10 +89,12 @@ func processRoomEvent(db RoomEventDatabase, input api.InputRoomEvent) error { db.SetState(stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID) } + // Update the extremities of the event graph for the room + if err := updateLatestEvents(db, roomNID, stateAtEvent, event); err != nil { + return err + } + // TODO: - // * Calcuate the state at the event if necessary. - // * Store the state at the event. - // * Update the extremities of the event graph for the room // * Caculate the new current state for the room if the forward extremities have changed. // * Work out the delta between the new current state and the previous current state. // * Work out the visibility of the event. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go new file mode 100644 index 000000000..c37c17001 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -0,0 +1,96 @@ +package input + +import ( + "bytes" + "github.com/matrix-org/dendrite/roomserver/types" + "github.com/matrix-org/gomatrixserverlib" +) + +// updateLatestEvents updates the list of latest events for this room. +// The latest events are the events that aren't referenced by another event in the database: +// +// Time goes down the page. 1 is the m.room.create event (root). +// +// 1 After storing 1 the latest events are {1} +// | After storing 2 the latest events are {2} +// 2 After storing 3 the latest events are {3} +// / \ After storing 4 the latest events are {3,4} +// 3 4 After storing 5 the latest events are {5,4} +// | | After storing 6 the latest events are {5,6} +// 5 6 <--- latest After storing 7 the latest events are {6,7} +// | +// 7 <----- latest +// +func updateLatestEvents( + db RoomEventDatabase, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, +) (err error) { + oldLatest, updater, err := db.GetLatestEventsForUpdate(roomNID) + if err != nil { + return + } + defer func() { + if err == nil { + // Commit if there wasn't an error. + // Set the returned err value if we encounter an error committing. + // This only works because err is a named return. + err = updater.Commit() + } else { + // Ignore any error we get rolling back since we don't want to + // clobber the current error + // TODO: log the error here. + updater.Rollback() + } + }() + + err = doUpdateLatestEvents(updater, oldLatest, roomNID, stateAtEvent, event) + return +} + +func doUpdateLatestEvents( + updater types.RoomRecentEventsUpdater, oldLatest []types.StateAtEventAndReference, roomNID types.RoomNID, stateAtEvent types.StateAtEvent, event gomatrixserverlib.Event, +) error { + var err error + var prevEvents []gomatrixserverlib.EventReference + prevEvents = event.PrevEvents() + + if err = updater.StorePreviousEvents(stateAtEvent.EventNID, prevEvents); err != nil { + return err + } + + // Check if this event references any of the latest events in the room. + var newLatest []types.StateAtEventAndReference + for _, l := range oldLatest { + for _, prevEvent := range prevEvents { + if l.EventID == prevEvent.EventID && bytes.Compare(l.EventSHA256, prevEvent.EventSHA256) == 0 { + // This event can be removed from the latest events cause we've found an event that references it. + // (If an event is referenced by another event then it can't be one of the latest events in the room + // because we have an event that comes after it) + continue + } + // Keep the event in the latest events. + newLatest = append(newLatest, l) + } + } + + eventReference := event.EventReference() + // Check if this event is already referenced by another event in the room. + var alreadyReferenced bool + if alreadyReferenced, err = updater.IsReferenced(eventReference); err != nil { + return err + } + + if !alreadyReferenced { + // This event is not referenced by any of the events in the room. + // Add it to the latest events + newLatest = append(newLatest, types.StateAtEventAndReference{ + StateAtEvent: stateAtEvent, + EventReference: eventReference, + }) + } + + if err = updater.SetLatestEvents(roomNID, newLatest); err != nil { + return err + } + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index 2f717f631..4a8957dd0 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -65,11 +65,16 @@ const bulkSelectStateAtEventByIDSQL = "" + const updateEventStateSQL = "" + "UPDATE events SET state_snapshot_nid = $2 WHERE event_nid = $1" +const bulkSelectStateAtEventAndReferenceSQL = "" + + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" + + " FROM events WHERE event_nid = ANY($1)" + type eventStatements struct { - insertEventStmt *sql.Stmt - bulkSelectStateEventByIDStmt *sql.Stmt - bulkSelectStateAtEventByIDStmt *sql.Stmt - updateEventStateStmt *sql.Stmt + insertEventStmt *sql.Stmt + bulkSelectStateEventByIDStmt *sql.Stmt + bulkSelectStateAtEventByIDStmt *sql.Stmt + updateEventStateStmt *sql.Stmt + bulkSelectStateAtEventAndReferenceStmt *sql.Stmt } func (s *eventStatements) prepare(db *sql.DB) (err error) { @@ -89,6 +94,9 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { if s.updateEventStateStmt, err = db.Prepare(updateEventStateSQL); err != nil { return } + if s.bulkSelectStateAtEventAndReferenceStmt, err = db.Prepare(bulkSelectStateAtEventAndReferenceSQL); err != nil { + return + } return } @@ -176,3 +184,43 @@ func (s *eventStatements) updateEventState(eventNID types.EventNID, stateNID typ _, err := s.updateEventStateStmt.Exec(int64(eventNID), int64(stateNID)) return err } + +func (s *eventStatements) bulkSelectStateAtEventAndReference(txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) { + nids := make([]int64, len(eventNIDs)) + for i := range eventNIDs { + nids[i] = int64(eventNIDs[i]) + } + rows, err := txn.Stmt(s.bulkSelectStateAtEventAndReferenceStmt).Query(pq.Int64Array(nids)) + if err != nil { + return nil, err + } + defer rows.Close() + results := make([]types.StateAtEventAndReference, len(eventNIDs)) + i := 0 + for ; rows.Next(); i++ { + var ( + eventTypeNID int64 + eventStateKeyNID int64 + eventNID int64 + stateSnapshotNID int64 + eventID string + eventSHA256 []byte + ) + if err = rows.Scan( + &eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256, + ); err != nil { + return nil, err + } + result := &results[i] + result.EventTypeNID = types.EventTypeNID(eventTypeNID) + result.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID) + result.EventNID = types.EventNID(eventNID) + result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID) + result.EventID = eventID + result.EventSHA256 = eventSHA256 + } + if i != len(eventNIDs) { + return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs)) + } + return results, nil +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/previous_events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/previous_events_table.go new file mode 100644 index 000000000..fe4601f00 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/previous_events_table.go @@ -0,0 +1,72 @@ +package storage + +import ( + "database/sql" + "github.com/matrix-org/dendrite/roomserver/types" +) + +const previousEventSchema = ` +-- The previous events table stores the event_ids referenced by the events +-- stored in the events table. +-- This is used to tell if a new event is already referenced by an event in +-- the database. +CREATE TABLE IF NOT EXISTS previous_events ( + -- The string event ID taken from the prev_events key of an event. + previous_event_id TEXT NOT NULL, + -- The SHA256 reference hash taken from the prev_events key of an event. + previous_reference_sha256 BYTEA NOT NULL, + -- A list of numeric event IDs of events that reference this prev_event. + event_nids BIGINT[] NOT NULL, + CONSTRAINT previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256) +); +` + +// Insert an entry into the previous_events table. +// If there is already an entry indicating that an event references that previous event then +// add the event NID to the list to indicate that this event references that previous event as well. +// This should only be modified while holding a "FOR UPDATE" lock on the row in the rooms table for this room. +// The lock is necessary to avoid data races when checking whether an event is already referenced by another event. +const insertPreviousEventSQL = "" + + "INSERT INTO previous_events" + + " (previous_event_id, previous_reference_sha256, event_nids)" + + " VALUES ($1, $2, array_append('{}'::bigint[], $3))" + + " ON CONFLICT ON CONSTRAINT previous_event_id_unique" + + " DO UPDATE SET event_nids = array_append(previous_events.event_nids, $3)" + + " WHERE $3 != ALL(previous_events.event_nids)" + +// Check if the event is referenced by another event in the table. +// This should only be done while holding a "FOR UPDATE" lock on the row in the rooms table for this room. +const selectPreviousEventExistsSQL = "" + + "SELECT 1 FROM previous_events" + + " WHERE previous_event_id = $1 AND previous_reference_sha256 = $2" + +type previousEventStatements struct { + insertPreviousEventStmt *sql.Stmt + selectPreviousEventExistsStmt *sql.Stmt +} + +func (s *previousEventStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(previousEventSchema) + if err != nil { + return + } + if s.insertPreviousEventStmt, err = db.Prepare(insertPreviousEventSQL); err != nil { + return + } + if s.selectPreviousEventExistsStmt, err = db.Prepare(selectPreviousEventExistsSQL); err != nil { + return + } + return +} + +func (s *previousEventStatements) insertPreviousEvent(txn *sql.Tx, previousEventID string, previousEventReferenceSHA256 []byte, eventNID types.EventNID) error { + _, err := txn.Stmt(s.insertPreviousEventStmt).Exec(previousEventID, previousEventReferenceSHA256, int64(eventNID)) + return err +} + +// Check if the event reference exists +// Returns sql.ErrNoRows if the event reference doesn't exist. +func (s *previousEventStatements) selectPreviousEventExists(txn *sql.Tx, eventID string, eventReferenceSHA256 []byte) error { + var ok int64 + return txn.Stmt(s.selectPreviousEventExistsStmt).QueryRow(eventID, eventReferenceSHA256).Scan(&ok) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go index 982fd24c8..07d668fac 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go @@ -2,6 +2,7 @@ package storage import ( "database/sql" + "github.com/lib/pq" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -11,7 +12,11 @@ CREATE TABLE IF NOT EXISTS rooms ( -- Local numeric ID for the room. room_nid BIGINT PRIMARY KEY DEFAULT nextval('room_nid_seq'), -- Textual ID for the room. - room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE + room_id TEXT NOT NULL CONSTRAINT room_id_unique UNIQUE, + -- The most recent events in the room that aren't referenced by another event. + -- This list may empty if the server hasn't joined the room yet. + -- (The server will be in that state while it stores the events for the initial state of the room) + latest_event_nids BIGINT[] NOT NULL ); ` @@ -25,9 +30,17 @@ const insertRoomNIDSQL = "" + const selectRoomNIDSQL = "" + "SELECT room_nid FROM rooms WHERE room_id = $1" +const selectLatestEventNIDsSQL = "" + + "SELECT latest_event_nids FROM rooms WHERE room_nid = $1 FOR UPDATE" + +const updateLatestEventNIDsSQL = "" + + "UPDATE rooms SET latest_event_nids = $2 WHERE room_nid = $1" + type roomStatements struct { - insertRoomNIDStmt *sql.Stmt - selectRoomNIDStmt *sql.Stmt + insertRoomNIDStmt *sql.Stmt + selectRoomNIDStmt *sql.Stmt + selectLatestEventNIDsStmt *sql.Stmt + updateLatestEventNIDsStmt *sql.Stmt } func (s *roomStatements) prepare(db *sql.DB) (err error) { @@ -41,6 +54,12 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { if s.selectRoomNIDStmt, err = db.Prepare(selectRoomNIDSQL); err != nil { return } + if s.selectLatestEventNIDsStmt, err = db.Prepare(selectLatestEventNIDsSQL); err != nil { + return + } + if s.updateLatestEventNIDsStmt, err = db.Prepare(updateLatestEventNIDsSQL); err != nil { + return + } return } @@ -55,3 +74,25 @@ func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) { err := s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) return types.RoomNID(roomNID), err } + +func (s *roomStatements) selectLatestEventsNIDsForUpdate(txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, error) { + var nids pq.Int64Array + err := txn.Stmt(s.selectLatestEventNIDsStmt).QueryRow(int64(roomNID)).Scan(&nids) + if err != nil { + return nil, err + } + eventNIDs := make([]types.EventNID, len(nids)) + for i := range nids { + eventNIDs[i] = types.EventNID(nids[i]) + } + return eventNIDs, nil +} + +func (s *roomStatements) updateLatestEventNIDs(txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID) error { + nids := make([]int64, len(eventNIDs)) + for i := range eventNIDs { + nids[i] = int64(eventNIDs[i]) + } + _, err := txn.Stmt(s.updateLatestEventNIDsStmt).Exec(roomNID, pq.Int64Array(nids)) + return err +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go index 976eab2b8..31b48fd98 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/sql.go @@ -13,6 +13,7 @@ type statements struct { eventJSONStatements stateSnapshotStatements stateBlockStatements + previousEventStatements } func (s *statements) prepare(db *sql.DB) error { @@ -50,5 +51,9 @@ func (s *statements) prepare(db *sql.DB) error { return err } + if err = s.previousEventStatements.prepare(db); err != nil { + return err + } + return nil } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 6162fcb7e..a49d0d84f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -195,3 +195,63 @@ func (d *Database) StateBlockNIDs(stateNIDs []types.StateSnapshotNID) ([]types.S func (d *Database) StateEntries(stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) { return d.statements.bulkSelectStateDataEntries(stateBlockNIDs) } + +// GetLatestEventsForUpdate implements input.EventDatabase +func (d *Database) GetLatestEventsForUpdate(roomNID types.RoomNID) ([]types.StateAtEventAndReference, types.RoomRecentEventsUpdater, error) { + txn, err := d.db.Begin() + if err != nil { + return nil, nil, err + } + eventNIDs, err := d.statements.selectLatestEventsNIDsForUpdate(txn, roomNID) + if err != nil { + txn.Rollback() + return nil, nil, err + } + stateAndRefs, err := d.statements.bulkSelectStateAtEventAndReference(txn, eventNIDs) + if err != nil { + txn.Rollback() + return nil, nil, err + } + return stateAndRefs, &roomRecentEventsUpdater{txn, d}, nil +} + +type roomRecentEventsUpdater struct { + txn *sql.Tx + d *Database +} + +func (u *roomRecentEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { + for _, ref := range previousEventReferences { + if err := u.d.statements.insertPreviousEvent(u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { + return err + } + } + return nil +} + +func (u *roomRecentEventsUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) { + err := u.d.statements.selectPreviousEventExists(u.txn, eventReference.EventID, eventReference.EventSHA256) + if err == nil { + return true, nil + } + if err == sql.ErrNoRows { + return false, nil + } + return false, err +} + +func (u *roomRecentEventsUpdater) SetLatestEvents(roomNID types.RoomNID, latest []types.StateAtEventAndReference) error { + eventNIDs := make([]types.EventNID, len(latest)) + for i := range latest { + eventNIDs[i] = latest[i].EventNID + } + return u.d.statements.updateLatestEventNIDs(u.txn, roomNID, eventNIDs) +} + +func (u *roomRecentEventsUpdater) Commit() error { + return u.txn.Commit() +} + +func (u *roomRecentEventsUpdater) Rollback() error { + return u.txn.Rollback() +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index 000238495..718c4deb1 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -79,6 +79,15 @@ func (s StateAtEvent) IsStateEvent() bool { return s.EventStateKeyNID != 0 } +// StateAtEventAndReference is StateAtEvent and gomatrixserverlib.EventReference glued together. +// It is used when looking up the latest events in a room in the database. +// The gomatrixserverlib.EventReference is used to check whether a new event references the event. +// The StateAtEvent is used to construct the current state of the room from the latest events. +type StateAtEventAndReference struct { + StateAtEvent + gomatrixserverlib.EventReference +} + // An Event is a gomatrixserverlib.Event with the numeric event ID attached. // It is when performing bulk event lookup in the database. type Event struct { @@ -119,3 +128,24 @@ type StateEntryList struct { StateBlockNID StateBlockNID StateEntries []StateEntry } + +// A RoomRecentEventsUpdater is used to update the recent events in a room. +// (On postgresql this wraps a database transaction that holds a "FOR UPDATE" +// lock on the row holding the latest events for the room.) +type RoomRecentEventsUpdater interface { + // Store the previous events referenced by an event. + // This adds the event NID to an entry in the database for each of the previous events. + // If there isn't an entry for one of previous events then an entry is created. + // If the entry already lists the event NID as a referrer then the entry unmodified. + // (i.e. the operation is idempotent) + StorePreviousEvents(eventNID EventNID, previousEventReferences []gomatrixserverlib.EventReference) error + // Check whether the eventReference is already referenced by another matrix event. + IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) + // Set the list of latest events for the room. + // This replaces the current list stored in the database with the given list + SetLatestEvents(roomNID RoomNID, latest []StateAtEventAndReference) error + // Commit the transaction + Commit() error + // Rollback the transaction. + Rollback() error +}