mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-14 09:43:48 +01:00
Add input API for adding invites to the roomserver. (#187)
* Add input API for adding invites to the roomserver. This API handles invites received over federation that occur outside of a room. * Add some docstring for withTransaction * Use a nicer pattern for wrapping transactions * Fix MembershipUpdater method to not commit the transaction before returning it * Use the Transaction interface from common
This commit is contained in:
parent
5950293e79
commit
57b7097368
8 changed files with 166 additions and 53 deletions
|
@ -18,6 +18,24 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// A Transaction is something that can be committed or rolledback.
|
||||||
|
type Transaction interface {
|
||||||
|
// Commit the transaction
|
||||||
|
Commit() error
|
||||||
|
// Rollback the transaction.
|
||||||
|
Rollback() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// EndTransaction ends a transaction.
|
||||||
|
// If the transaction succeeded then it is committed, otherwise it is rolledback.
|
||||||
|
func EndTransaction(txn Transaction, succeeded *bool) {
|
||||||
|
if *succeeded {
|
||||||
|
txn.Commit()
|
||||||
|
} else {
|
||||||
|
txn.Rollback()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithTransaction runs a block of code passing in an SQL transaction
|
// WithTransaction runs a block of code passing in an SQL transaction
|
||||||
// If the code returns an error or panics then the transactions is rolledback
|
// If the code returns an error or panics then the transactions is rolledback
|
||||||
// Otherwise the transaction is committed.
|
// Otherwise the transaction is committed.
|
||||||
|
@ -26,16 +44,14 @@ func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer func() {
|
succeeded := false
|
||||||
if r := recover(); r != nil {
|
defer EndTransaction(txn, &succeeded)
|
||||||
txn.Rollback()
|
|
||||||
panic(r)
|
|
||||||
} else if err != nil {
|
|
||||||
txn.Rollback()
|
|
||||||
} else {
|
|
||||||
err = txn.Commit()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
err = fn(txn)
|
err = fn(txn)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
succeeded = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,9 +68,17 @@ type InputRoomEvent struct {
|
||||||
SendAsServer string `json:"send_as_server"`
|
SendAsServer string `json:"send_as_server"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InputInviteEvent is a matrix invite event received over federation without
|
||||||
|
// the usual context a matrix room event would have. We usually do not have
|
||||||
|
// access to the events needed to check the event auth rules for the invite.
|
||||||
|
type InputInviteEvent struct {
|
||||||
|
Event gomatrixserverlib.Event `json:"event"`
|
||||||
|
}
|
||||||
|
|
||||||
// InputRoomEventsRequest is a request to InputRoomEvents
|
// InputRoomEventsRequest is a request to InputRoomEvents
|
||||||
type InputRoomEventsRequest struct {
|
type InputRoomEventsRequest struct {
|
||||||
InputRoomEvents []InputRoomEvent `json:"input_room_events"`
|
InputRoomEvents []InputRoomEvent `json:"input_room_events"`
|
||||||
|
InputInviteEvents []InputInviteEvent `json:"input_invite_events"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// InputRoomEventsResponse is a response to InputRoomEvents
|
// InputRoomEventsResponse is a response to InputRoomEvents
|
||||||
|
|
|
@ -15,6 +15,9 @@
|
||||||
package input
|
package input
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
@ -39,6 +42,8 @@ type RoomEventDatabase interface {
|
||||||
GetLatestEventsForUpdate(roomNID types.RoomNID) (updater types.RoomRecentEventsUpdater, err error)
|
GetLatestEventsForUpdate(roomNID types.RoomNID) (updater types.RoomRecentEventsUpdater, err error)
|
||||||
// Lookup the string event IDs for a list of numeric event IDs
|
// Lookup the string event IDs for a list of numeric event IDs
|
||||||
EventIDs(eventNIDs []types.EventNID) (map[types.EventNID]string, error)
|
EventIDs(eventNIDs []types.EventNID) (map[types.EventNID]string, error)
|
||||||
|
// Build a membership updater for the target user in a room.
|
||||||
|
MembershipUpdater(roomID, targerUserID string) (types.MembershipUpdater, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
|
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
|
||||||
|
@ -103,13 +108,64 @@ func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO:
|
return nil
|
||||||
// * Caculate the new current state for the room if the forward extremities have changed.
|
}
|
||||||
// * Work out the delta between the new current state and the previous current state.
|
|
||||||
// * Work out the visibility of the event.
|
func processInviteEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputInviteEvent) (err error) {
|
||||||
// * Write a message to the output logs containing:
|
if input.Event.StateKey() == nil {
|
||||||
// - The event itself
|
return fmt.Errorf("invite must be a state event")
|
||||||
// - The visiblity of the event, i.e. who is allowed to see the event.
|
}
|
||||||
// - The changes to the current state of the room.
|
|
||||||
|
roomID := input.Event.RoomID()
|
||||||
|
targetUserID := *input.Event.StateKey()
|
||||||
|
|
||||||
|
updater, err := db.MembershipUpdater(roomID, targetUserID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
succeeded := false
|
||||||
|
defer common.EndTransaction(updater, &succeeded)
|
||||||
|
|
||||||
|
if updater.IsJoin() {
|
||||||
|
// If the user is joined to the room then that takes precedence over this
|
||||||
|
// invite event. It makes little sense to move a user that is already
|
||||||
|
// joined to the room into the invite state.
|
||||||
|
// This could plausibly happen if an invite request raced with a join
|
||||||
|
// request for a user. For example if a user was invited to a public
|
||||||
|
// room and they joined the room at the same time as the invite was sent.
|
||||||
|
// The other way this could plausibly happen is if an invite raced with
|
||||||
|
// a kick. For example if a user was kicked from a room in error and in
|
||||||
|
// response someone else in the room re-invited them then it is possible
|
||||||
|
// for the invite request to race with the leave event so that the
|
||||||
|
// target receives invite before it learns that it has been kicked.
|
||||||
|
// There are a few ways this could be plausibly handled in the roomserver.
|
||||||
|
// 1) Store the invite, but mark it as retired. That will result in the
|
||||||
|
// permanent rejection of that invite event. So even if the target
|
||||||
|
// user leaves the room and the invite is retransmitted it will be
|
||||||
|
// ignored. However a new invite with a new event ID would still be
|
||||||
|
// accepted.
|
||||||
|
// 2) Silently discard the invite event. This means that if the event
|
||||||
|
// was retransmitted at a later date after the target user had left
|
||||||
|
// the room we would accept the invite. However since we hadn't told
|
||||||
|
// the sending server that the invite had been discarded it would
|
||||||
|
// have no reason to attempt to retry.
|
||||||
|
// 3) Signal the sending server that the user is already joined to the
|
||||||
|
// room.
|
||||||
|
// For now we will implement option 2. Since in the abesence of a retry
|
||||||
|
// mechanism it will be equivalent to option 1, and we don't have a
|
||||||
|
// signalling mechanism to implement option 3.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
succeeded = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,11 @@ func (r *RoomserverInputAPI) InputRoomEvents(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for i := range request.InputInviteEvents {
|
||||||
|
if err := processInviteEvent(r.DB, r, request.InputInviteEvents[i]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ package input
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
@ -52,25 +53,19 @@ func updateLatestEvents(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer func() {
|
succeeded := false
|
||||||
if err == nil {
|
defer common.EndTransaction(updater, &succeeded)
|
||||||
// Commit if there wasn't an error.
|
|
||||||
// Set the returned err value if we encounter an error committing.
|
|
||||||
// This only works because err is a named return.
|
|
||||||
err = updater.Commit()
|
|
||||||
} else {
|
|
||||||
// Ignore any error we get rolling back since we don't want to
|
|
||||||
// clobber the current error
|
|
||||||
// TODO: log the error here.
|
|
||||||
updater.Rollback()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
u := latestEventsUpdater{
|
u := latestEventsUpdater{
|
||||||
db: db, updater: updater, ow: ow, roomNID: roomNID,
|
db: db, updater: updater, ow: ow, roomNID: roomNID,
|
||||||
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
|
stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
|
||||||
}
|
}
|
||||||
return u.doUpdateLatestEvents()
|
if err = u.doUpdateLatestEvents(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
succeeded = true
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// latestEventsUpdater tracks the state used to update the latest events in the
|
// latestEventsUpdater tracks the state used to update the latest events in the
|
||||||
|
|
|
@ -80,15 +80,23 @@ func (s *roomStatements) prepare(db *sql.DB) (err error) {
|
||||||
}.prepare(db)
|
}.prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *roomStatements) insertRoomNID(roomID string) (types.RoomNID, error) {
|
func (s *roomStatements) insertRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) {
|
||||||
var roomNID int64
|
var roomNID int64
|
||||||
err := s.insertRoomNIDStmt.QueryRow(roomID).Scan(&roomNID)
|
stmt := s.insertRoomNIDStmt
|
||||||
|
if txn != nil {
|
||||||
|
stmt = txn.Stmt(stmt)
|
||||||
|
}
|
||||||
|
err := stmt.QueryRow(roomID).Scan(&roomNID)
|
||||||
return types.RoomNID(roomNID), err
|
return types.RoomNID(roomNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *roomStatements) selectRoomNID(roomID string) (types.RoomNID, error) {
|
func (s *roomStatements) selectRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) {
|
||||||
var roomNID int64
|
var roomNID int64
|
||||||
err := s.selectRoomNIDStmt.QueryRow(roomID).Scan(&roomNID)
|
stmt := s.selectRoomNIDStmt
|
||||||
|
if txn != nil {
|
||||||
|
stmt = txn.Stmt(stmt)
|
||||||
|
}
|
||||||
|
err := stmt.QueryRow(roomID).Scan(&roomNID)
|
||||||
return types.RoomNID(roomNID), err
|
return types.RoomNID(roomNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
if roomNID, err = d.assignRoomNID(event.RoomID()); err != nil {
|
if roomNID, err = d.assignRoomNID(nil, event.RoomID()); err != nil {
|
||||||
return 0, types.StateAtEvent{}, err
|
return 0, types.StateAtEvent{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,15 +104,15 @@ func (d *Database) StoreEvent(event gomatrixserverlib.Event, authEventNIDs []typ
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) assignRoomNID(roomID string) (types.RoomNID, error) {
|
func (d *Database) assignRoomNID(txn *sql.Tx, roomID string) (types.RoomNID, error) {
|
||||||
// Check if we already have a numeric ID in the database.
|
// Check if we already have a numeric ID in the database.
|
||||||
roomNID, err := d.statements.selectRoomNID(roomID)
|
roomNID, err := d.statements.selectRoomNID(txn, roomID)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// We don't have a numeric ID so insert one into the database.
|
// We don't have a numeric ID so insert one into the database.
|
||||||
roomNID, err = d.statements.insertRoomNID(roomID)
|
roomNID, err = d.statements.insertRoomNID(txn, roomID)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// We raced with another insert so run the select again.
|
// We raced with another insert so run the select again.
|
||||||
roomNID, err = d.statements.selectRoomNID(roomID)
|
roomNID, err = d.statements.selectRoomNID(txn, roomID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return roomNID, err
|
return roomNID, err
|
||||||
|
@ -329,7 +329,7 @@ func (u *roomRecentEventsUpdater) MembershipUpdater(targetUserNID types.EventSta
|
||||||
|
|
||||||
// RoomNID implements query.RoomserverQueryAPIDB
|
// RoomNID implements query.RoomserverQueryAPIDB
|
||||||
func (d *Database) RoomNID(roomID string) (types.RoomNID, error) {
|
func (d *Database) RoomNID(roomID string) (types.RoomNID, error) {
|
||||||
roomNID, err := d.statements.selectRoomNID(roomID)
|
roomNID, err := d.statements.selectRoomNID(nil, roomID)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
@ -380,6 +380,38 @@ func (d *Database) StateEntriesForTuples(
|
||||||
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
|
return d.statements.bulkSelectFilteredStateBlockEntries(stateBlockNIDs, stateKeyTuples)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MembershipUpdater implements input.RoomEventDatabase
|
||||||
|
func (d *Database) MembershipUpdater(roomID, targetUserID string) (types.MembershipUpdater, error) {
|
||||||
|
txn, err := d.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
succeeded := false
|
||||||
|
defer func() {
|
||||||
|
if !succeeded {
|
||||||
|
txn.Rollback()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
roomNID, err := d.assignRoomNID(txn, roomID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
targetUserNID, err := d.assignStateKeyNID(txn, targetUserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
updater, err := d.membershipUpdaterTxn(txn, roomNID, targetUserNID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
succeeded = true
|
||||||
|
return updater, nil
|
||||||
|
}
|
||||||
|
|
||||||
type membershipUpdater struct {
|
type membershipUpdater struct {
|
||||||
transaction
|
transaction
|
||||||
d *Database
|
d *Database
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -135,14 +136,6 @@ type StateEntryList struct {
|
||||||
StateEntries []StateEntry
|
StateEntries []StateEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
// A Transaction is something that can be committed or rolledback.
|
|
||||||
type Transaction interface {
|
|
||||||
// Commit the transaction
|
|
||||||
Commit() error
|
|
||||||
// Rollback the transaction.
|
|
||||||
Rollback() error
|
|
||||||
}
|
|
||||||
|
|
||||||
// A RoomRecentEventsUpdater is used to update the recent events in a room.
|
// A RoomRecentEventsUpdater is used to update the recent events in a room.
|
||||||
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
|
// (On postgresql this wraps a database transaction that holds a "FOR UPDATE"
|
||||||
// lock on the row in the rooms table holding the latest events for the room.)
|
// lock on the row in the rooms table holding the latest events for the room.)
|
||||||
|
@ -175,7 +168,7 @@ type RoomRecentEventsUpdater interface {
|
||||||
// It will share the same transaction as this updater.
|
// It will share the same transaction as this updater.
|
||||||
MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error)
|
MembershipUpdater(targetUserNID EventStateKeyNID) (MembershipUpdater, error)
|
||||||
// Implements Transaction so it can be committed or rolledback
|
// Implements Transaction so it can be committed or rolledback
|
||||||
Transaction
|
common.Transaction
|
||||||
}
|
}
|
||||||
|
|
||||||
// A MembershipUpdater is used to update the membership of a user in a room.
|
// A MembershipUpdater is used to update the membership of a user in a room.
|
||||||
|
@ -200,7 +193,7 @@ type MembershipUpdater interface {
|
||||||
// Returns a list of invite event IDs that this state change retired.
|
// Returns a list of invite event IDs that this state change retired.
|
||||||
SetToLeave(senderUserID string, eventID string) (inviteEventIDs []string, err error)
|
SetToLeave(senderUserID string, eventID string) (inviteEventIDs []string, err error)
|
||||||
// Implements Transaction so it can be committed or rolledback.
|
// Implements Transaction so it can be committed or rolledback.
|
||||||
Transaction
|
common.Transaction
|
||||||
}
|
}
|
||||||
|
|
||||||
// A MissingEventError is an error that happened because the roomserver was
|
// A MissingEventError is an error that happened because the roomserver was
|
||||||
|
|
Loading…
Reference in a new issue