From 57b7097368f0be9d89ec740d1f6528322784b0e6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 21 Aug 2017 16:37:11 +0100 Subject: [PATCH] Add input API for adding invites to the roomserver. (#187) * Add input API for adding invites to the roomserver. This API handles invites received over federation that occur outside of a room. * Add some docstring for withTransaction * Use a nicer pattern for wrapping transactions * Fix MembershipUpdater method to not commit the transaction before returning it * Use the Transaction interface from common --- .../matrix-org/dendrite/common/sql.go | 36 +++++++--- .../dendrite/roomserver/api/input.go | 10 ++- .../dendrite/roomserver/input/events.go | 72 ++++++++++++++++--- .../dendrite/roomserver/input/input.go | 5 ++ .../roomserver/input/latest_events.go | 23 +++--- .../roomserver/storage/rooms_table.go | 16 +++-- .../dendrite/roomserver/storage/storage.go | 44 ++++++++++-- .../dendrite/roomserver/types/types.go | 13 +--- 8 files changed, 166 insertions(+), 53 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/common/sql.go b/src/github.com/matrix-org/dendrite/common/sql.go index cabbe6662..4abe7410e 100644 --- a/src/github.com/matrix-org/dendrite/common/sql.go +++ b/src/github.com/matrix-org/dendrite/common/sql.go @@ -18,6 +18,24 @@ import ( "database/sql" ) +// A Transaction is something that can be committed or rolledback. +type Transaction interface { + // Commit the transaction + Commit() error + // Rollback the transaction. + Rollback() error +} + +// EndTransaction ends a transaction. +// If the transaction succeeded then it is committed, otherwise it is rolledback. +func EndTransaction(txn Transaction, succeeded *bool) { + if *succeeded { + txn.Commit() + } else { + txn.Rollback() + } +} + // WithTransaction runs a block of code passing in an SQL transaction // If the code returns an error or panics then the transactions is rolledback // Otherwise the transaction is committed. @@ -26,16 +44,14 @@ func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { if err != nil { return } - defer func() { - if r := recover(); r != nil { - txn.Rollback() - panic(r) - } else if err != nil { - txn.Rollback() - } else { - err = txn.Commit() - } - }() + succeeded := false + defer EndTransaction(txn, &succeeded) + err = fn(txn) + if err != nil { + return + } + + succeeded = true return } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index 558eb28c4..cbe7399ba 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -68,9 +68,17 @@ type InputRoomEvent struct { SendAsServer string `json:"send_as_server"` } +// InputInviteEvent is a matrix invite event received over federation without +// the usual context a matrix room event would have. We usually do not have +// access to the events needed to check the event auth rules for the invite. +type InputInviteEvent struct { + Event gomatrixserverlib.Event `json:"event"` +} + // InputRoomEventsRequest is a request to InputRoomEvents type InputRoomEventsRequest struct { - InputRoomEvents []InputRoomEvent `json:"input_room_events"` + InputRoomEvents []InputRoomEvent `json:"input_room_events"` + InputInviteEvents []InputInviteEvent `json:"input_invite_events"` } // InputRoomEventsResponse is a response to InputRoomEvents diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index c1eee4c96..82b4652e6 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -15,6 +15,9 @@ package input import ( + "fmt" + + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" @@ -39,6 +42,8 @@ type RoomEventDatabase interface { GetLatestEventsForUpdate(roomNID types.RoomNID) (updater types.RoomRecentEventsUpdater, err error) // Lookup the string event IDs for a list of numeric event IDs EventIDs(eventNIDs []types.EventNID) (map[types.EventNID]string, error) + // Build a membership updater for the target user in a room. + MembershipUpdater(roomID, targerUserID string) (types.MembershipUpdater, error) } // OutputRoomEventWriter has the APIs needed to write an event to the output logs. @@ -103,13 +108,64 @@ func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api. return err } - // TODO: - // * Caculate the new current state for the room if the forward extremities have changed. - // * Work out the delta between the new current state and the previous current state. - // * Work out the visibility of the event. - // * Write a message to the output logs containing: - // - The event itself - // - The visiblity of the event, i.e. who is allowed to see the event. - // - The changes to the current state of the room. + return nil +} + +func processInviteEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputInviteEvent) (err error) { + if input.Event.StateKey() == nil { + return fmt.Errorf("invite must be a state event") + } + + roomID := input.Event.RoomID() + targetUserID := *input.Event.StateKey() + + updater, err := db.MembershipUpdater(roomID, targetUserID) + if err != nil { + return err + } + succeeded := false + defer common.EndTransaction(updater, &succeeded) + + if updater.IsJoin() { + // If the user is joined to the room then that takes precedence over this + // invite event. It makes little sense to move a user that is already + // joined to the room into the invite state. + // This could plausibly happen if an invite request raced with a join + // request for a user. For example if a user was invited to a public + // room and they joined the room at the same time as the invite was sent. + // The other way this could plausibly happen is if an invite raced with + // a kick. For example if a user was kicked from a room in error and in + // response someone else in the room re-invited them then it is possible + // for the invite request to race with the leave event so that the + // target receives invite before it learns that it has been kicked. + // There are a few ways this could be plausibly handled in the roomserver. + // 1) Store the invite, but mark it as retired. That will result in the + // permanent rejection of that invite event. So even if the target + // user leaves the room and the invite is retransmitted it will be + // ignored. However a new invite with a new event ID would still be + // accepted. + // 2) Silently discard the invite event. This means that if the event + // was retransmitted at a later date after the target user had left + // the room we would accept the invite. However since we hadn't told + // the sending server that the invite had been discarded it would + // have no reason to attempt to retry. + // 3) Signal the sending server that the user is already joined to the + // room. + // For now we will implement option 2. Since in the abesence of a retry + // mechanism it will be equivalent to option 1, and we don't have a + // signalling mechanism to implement option 3. + return nil + } + + outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil) + if err != nil { + return err + } + + if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil { + return err + } + + succeeded = true return nil } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index 210abfa29..17e94599e 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -61,6 +61,11 @@ func (r *RoomserverInputAPI) InputRoomEvents( return err } } + for i := range request.InputInviteEvents { + if err := processInviteEvent(r.DB, r, request.InputInviteEvents[i]); err != nil { + return err + } + } return nil } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 9328ecf3b..d9aa2b455 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -17,6 +17,7 @@ package input import ( "bytes" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" @@ -52,25 +53,19 @@ func updateLatestEvents( if err != nil { return } - defer func() { - if err == nil { - // Commit if there wasn't an error. - // Set the returned err value if we encounter an error committing. - // This only works because err is a named return. - err = updater.Commit() - } else { - // Ignore any error we get rolling back since we don't want to - // clobber the current error - // TODO: log the error here. - updater.Rollback() - } - }() + succeeded := false + defer common.EndTransaction(updater, &succeeded) u := latestEventsUpdater{ db: db, updater: updater, ow: ow, roomNID: roomNID, stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, } - return u.doUpdateLatestEvents() + if err = u.doUpdateLatestEvents(); err != nil { + return err + } + + succeeded = true + return } // latestEventsUpdater tracks the state used to update the latest events in the diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go index 03cacd7db..24744fdff 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/rooms_table.go @@ -80,15 +80,23 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) { }.prepare(db) } -func (s *roomStatements) insertRoomNID(roomID string) (types.RoomNID, error) { +func (s *roomStatements) insertRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { var roomNID int64 - err := s.insertRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) + stmt := s.insertRoomNIDStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + err := stmt.QueryRow(roomID).Scan(&roomNID) return types.RoomNID(roomNID), err } -func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) { +func (s *roomStatements) selectRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { var roomNID int64 - err := s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID) + stmt := s.selectRoomNIDStmt + if txn != nil { + stmt = txn.Stmt(stmt) + } + err := stmt.QueryRow(roomID).Scan(&roomNID) return types.RoomNID(roomNID), err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index 17b30860c..fbbc723ee 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -53,7 +53,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ err error ) - if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil { + if roomNID, err = d.assignRoomNID(nil, event.RoomID()); err != nil { return 0, types.StateAtEvent{}, err } @@ -104,15 +104,15 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ }, nil } -func (d *Database) assignRoomNID(roomID string) (types.RoomNID, error) { +func (d *Database) assignRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) { // Check if we already have a numeric ID in the database. - roomNID, err := d.statements.selectRoomNID(roomID) + roomNID, err := d.statements.selectRoomNID(txn, roomID) if err == sql.ErrNoRows { // We don't have a numeric ID so insert one into the database. - roomNID, err = d.statements.insertRoomNID(roomID) + roomNID, err = d.statements.insertRoomNID(txn, roomID) if err == sql.ErrNoRows { // We raced with another insert so run the select again. - roomNID, err = d.statements.selectRoomNID(roomID) + roomNID, err = d.statements.selectRoomNID(txn, roomID) } } return roomNID, err @@ -329,7 +329,7 @@ func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventSta // RoomNID implements query.RoomserverQueryAPIDB func (d *Database) RoomNID(roomID string) (types.RoomNID, error) { - roomNID, err := d.statements.selectRoomNID(roomID) + roomNID, err := d.statements.selectRoomNID(nil, roomID) if err == sql.ErrNoRows { return 0, nil } @@ -380,6 +380,38 @@ func (d *Database) StateEntriesForTuples( return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples) } +// MembershipUpdater implements input.RoomEventDatabase +func (d *Database) MembershipUpdater(roomID, targetUserID string) (types.MembershipUpdater, error) { + txn, err := d.db.Begin() + if err != nil { + return nil, err + } + succeeded := false + defer func() { + if !succeeded { + txn.Rollback() + } + }() + + roomNID, err := d.assignRoomNID(txn, roomID) + if err != nil { + return nil, err + } + + targetUserNID, err := d.assignStateKeyNID(txn, targetUserID) + if err != nil { + return nil, err + } + + updater, err := d.membershipUpdaterTxn(txn, roomNID, targetUserNID) + if err != nil { + return nil, err + } + + succeeded = true + return updater, nil +} + type membershipUpdater struct { transaction d *Database diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index e8bc99fcf..d5fe32762 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -16,6 +16,7 @@ package types import ( + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" ) @@ -135,14 +136,6 @@ type StateEntryList struct { StateEntries []StateEntry } -// A Transaction is something that can be committed or rolledback. -type Transaction interface { - // Commit the transaction - Commit() error - // Rollback the transaction. - Rollback() error -} - // A RoomRecentEventsUpdater is used to update the recent events in a room. // (On postgresql this wraps a database transaction that holds a "FOR UPDATE" // lock on the row in the rooms table holding the latest events for the room.) @@ -175,7 +168,7 @@ type RoomRecentEventsUpdater interface { // It will share the same transaction as this updater. MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error) // Implements Transaction so it can be committed or rolledback - Transaction + common.Transaction } // A MembershipUpdater is used to update the membership of a user in a room. @@ -200,7 +193,7 @@ type MembershipUpdater interface { // Returns a list of invite event IDs that this state change retired. SetToLeave(senderUserID string, eventID string) (inviteEventIDs []string, err error) // Implements Transaction so it can be committed or rolledback. - Transaction + common.Transaction } // A MissingEventError is an error that happened because the roomserver was