mirror of
https://github.com/matrix-org/dendrite
synced 2024-12-15 01:03:43 +01:00
Cleanup syncapi topology logic (#1035)
* Cleanup syncapi topology logic * Variable renaming * comments
This commit is contained in:
parent
3cb04e8004
commit
7ca230e931
9 changed files with 67 additions and 154 deletions
|
@ -245,24 +245,20 @@ func (r *messagesReq) retrieveEvents() (
|
||||||
// change the way topological positions are defined (as depth isn't the most
|
// change the way topological positions are defined (as depth isn't the most
|
||||||
// reliable way to define it), it would be easier and less troublesome to
|
// reliable way to define it), it would be easier and less troublesome to
|
||||||
// only have to change it in one place, i.e. the database.
|
// only have to change it in one place, i.e. the database.
|
||||||
startPos, startStreamPos, err := r.db.EventPositionInTopology(
|
start, err = r.db.EventPositionInTopology(
|
||||||
r.ctx, events[0].EventID(),
|
r.ctx, events[0].EventID(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("EventPositionInTopology: for start event %s: %w", events[0].EventID(), err)
|
err = fmt.Errorf("EventPositionInTopology: for start event %s: %w", events[0].EventID(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
endPos, endStreamPos, err := r.db.EventPositionInTopology(
|
end, err = r.db.EventPositionInTopology(
|
||||||
r.ctx, events[len(events)-1].EventID(),
|
r.ctx, events[len(events)-1].EventID(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("EventPositionInTopology: for end event %s: %w", events[len(events)-1].EventID(), err)
|
err = fmt.Errorf("EventPositionInTopology: for end event %s: %w", events[len(events)-1].EventID(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Generate pagination tokens to send to the client using the positions
|
|
||||||
// retrieved previously.
|
|
||||||
start = types.NewTopologyToken(startPos, startStreamPos)
|
|
||||||
end = types.NewTopologyToken(endPos, endStreamPos)
|
|
||||||
|
|
||||||
if r.backwardOrdering {
|
if r.backwardOrdering {
|
||||||
// A stream/topological position is a cursor located between two events.
|
// A stream/topological position is a cursor located between two events.
|
||||||
|
@ -431,14 +427,10 @@ func setToDefault(
|
||||||
) (to types.TopologyToken, err error) {
|
) (to types.TopologyToken, err error) {
|
||||||
if backwardOrdering {
|
if backwardOrdering {
|
||||||
// go 1 earlier than the first event so we correctly fetch the earliest event
|
// go 1 earlier than the first event so we correctly fetch the earliest event
|
||||||
|
// this is because Database.GetEventsInTopologicalRange is exclusive of the lower-bound.
|
||||||
to = types.NewTopologyToken(0, 0)
|
to = types.NewTopologyToken(0, 0)
|
||||||
} else {
|
} else {
|
||||||
var depth, stream types.StreamPosition
|
to, err = db.MaxTopologicalPosition(ctx, roomID)
|
||||||
depth, stream, err = db.MaxTopologicalPosition(ctx, roomID)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
to = types.NewTopologyToken(depth, stream)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -93,15 +93,12 @@ type Database interface {
|
||||||
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
|
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
|
||||||
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
|
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
|
||||||
// EventPositionInTopology returns the depth and stream position of the given event.
|
// EventPositionInTopology returns the depth and stream position of the given event.
|
||||||
EventPositionInTopology(ctx context.Context, eventID string) (depth types.StreamPosition, stream types.StreamPosition, err error)
|
EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
|
||||||
// EventsAtTopologicalPosition returns all of the events matching a given
|
|
||||||
// position in the topology of a given room.
|
|
||||||
EventsAtTopologicalPosition(ctx context.Context, roomID string, pos types.StreamPosition) ([]types.StreamEvent, error)
|
|
||||||
// BackwardExtremitiesForRoom returns the event IDs of all of the backward
|
// BackwardExtremitiesForRoom returns the event IDs of all of the backward
|
||||||
// extremities we know of for a given room.
|
// extremities we know of for a given room.
|
||||||
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error)
|
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities []string, err error)
|
||||||
// MaxTopologicalPosition returns the highest topological position for a given room.
|
// MaxTopologicalPosition returns the highest topological position for a given room.
|
||||||
MaxTopologicalPosition(ctx context.Context, roomID string) (depth types.StreamPosition, stream types.StreamPosition, err error)
|
MaxTopologicalPosition(ctx context.Context, roomID string) (types.TopologyToken, error)
|
||||||
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
|
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
|
||||||
// matches the streamevent.transactionID device then the transaction ID gets
|
// matches the streamevent.transactionID device then the transaction ID gets
|
||||||
// added to the unsigned section of the output event.
|
// added to the unsigned section of the output event.
|
||||||
|
|
|
@ -72,17 +72,12 @@ const selectMaxPositionInTopologySQL = "" +
|
||||||
"SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" +
|
"SELECT MAX(topological_position) FROM syncapi_output_room_events_topology WHERE room_id=$1" +
|
||||||
") ORDER BY stream_position DESC LIMIT 1"
|
") ORDER BY stream_position DESC LIMIT 1"
|
||||||
|
|
||||||
const selectEventIDsFromPositionSQL = "" +
|
|
||||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
|
||||||
" WHERE room_id = $1 AND topological_position = $2"
|
|
||||||
|
|
||||||
type outputRoomEventsTopologyStatements struct {
|
type outputRoomEventsTopologyStatements struct {
|
||||||
insertEventInTopologyStmt *sql.Stmt
|
insertEventInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsInRangeASCStmt *sql.Stmt
|
selectEventIDsInRangeASCStmt *sql.Stmt
|
||||||
selectEventIDsInRangeDESCStmt *sql.Stmt
|
selectEventIDsInRangeDESCStmt *sql.Stmt
|
||||||
selectPositionInTopologyStmt *sql.Stmt
|
selectPositionInTopologyStmt *sql.Stmt
|
||||||
selectMaxPositionInTopologyStmt *sql.Stmt
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsFromPositionStmt *sql.Stmt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
|
@ -106,9 +101,6 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
|
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +119,7 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||||
// given range in a given room's topological order.
|
// given range in a given room's topological order.
|
||||||
// Returns an empty slice if no events match the given range.
|
// Returns an empty slice if no events match the given range.
|
||||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||||
ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition,
|
ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition,
|
||||||
limit int, chronologicalOrder bool,
|
limit int, chronologicalOrder bool,
|
||||||
) (eventIDs []string, err error) {
|
) (eventIDs []string, err error) {
|
||||||
// Decide on the selection's order according to whether chronological order
|
// Decide on the selection's order according to whether chronological order
|
||||||
|
@ -140,7 +132,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query the event IDs.
|
// Query the event IDs.
|
||||||
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit)
|
rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// If no event matched the request, return an empty slice.
|
// If no event matched the request, return an empty slice.
|
||||||
return []string{}, nil
|
return []string{}, nil
|
||||||
|
@ -176,28 +168,3 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
|
||||||
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectEventIDsFromPosition returns the IDs of all events that have a given
|
|
||||||
// position in the topology of a given room.
|
|
||||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition(
|
|
||||||
ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition,
|
|
||||||
) (eventIDs []string, err error) {
|
|
||||||
// Query the event IDs.
|
|
||||||
rows, err := s.selectEventIDsFromPositionStmt.QueryContext(ctx, roomID, pos)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
// If no event matched the request, return an empty slice.
|
|
||||||
return []string{}, nil
|
|
||||||
} else if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer common.CloseAndLogIfError(ctx, rows, "selectEventIDsFromPosition: rows.close() failed")
|
|
||||||
// Return the IDs.
|
|
||||||
var eventID string
|
|
||||||
for rows.Next() {
|
|
||||||
if err = rows.Scan(&eventID); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
eventIDs = append(eventIDs, eventID)
|
|
||||||
}
|
|
||||||
return eventIDs, rows.Err()
|
|
||||||
}
|
|
||||||
|
|
|
@ -319,25 +319,25 @@ func (d *Database) GetEventsInTopologicalRange(
|
||||||
roomID string, limit int,
|
roomID string, limit int,
|
||||||
backwardOrdering bool,
|
backwardOrdering bool,
|
||||||
) (events []types.StreamEvent, err error) {
|
) (events []types.StreamEvent, err error) {
|
||||||
// Determine the backward and forward limit, i.e. the upper and lower
|
var minDepth, maxDepth, maxStreamPosForMaxDepth types.StreamPosition
|
||||||
// limits to the selection in the room's topology, from the direction.
|
|
||||||
var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
|
|
||||||
if backwardOrdering {
|
if backwardOrdering {
|
||||||
// Backward ordering is antichronological (latest event to oldest
|
// Backward ordering means the 'from' token has a higher depth than the 'to' token
|
||||||
// one).
|
minDepth = to.Depth()
|
||||||
backwardLimit = to.Depth()
|
maxDepth = from.Depth()
|
||||||
forwardLimit = from.Depth()
|
// for cases where we have say 5 events with the same depth, the TopologyToken needs to
|
||||||
forwardMicroLimit = from.PDUPosition()
|
// know which of the 5 the client has seen. This is done by using the PDU position.
|
||||||
|
// Events with the same maxDepth but less than this PDU position will be returned.
|
||||||
|
maxStreamPosForMaxDepth = from.PDUPosition()
|
||||||
} else {
|
} else {
|
||||||
// Forward ordering is chronological (oldest event to latest one).
|
// Forward ordering means the 'from' token has a lower depth than the 'to' token.
|
||||||
backwardLimit = from.Depth()
|
minDepth = from.Depth()
|
||||||
forwardLimit = to.Depth()
|
maxDepth = to.Depth()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Select the event IDs from the defined range.
|
// Select the event IDs from the defined range.
|
||||||
var eIDs []string
|
var eIDs []string
|
||||||
eIDs, err = d.Topology.SelectEventIDsInRange(
|
eIDs, err = d.Topology.SelectEventIDsInRange(
|
||||||
ctx, nil, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
|
ctx, nil, roomID, minDepth, maxDepth, maxStreamPosForMaxDepth, limit, !backwardOrdering,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -350,7 +350,7 @@ func (d *Database) GetEventsInTopologicalRange(
|
||||||
|
|
||||||
func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
|
func (d *Database) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
|
||||||
err = common.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
err = common.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
||||||
pos, err := d.SyncPositionTx(ctx, txn)
|
pos, err := d.syncPositionTx(ctx, txn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -368,29 +368,25 @@ func (d *Database) BackwardExtremitiesForRoom(
|
||||||
|
|
||||||
func (d *Database) MaxTopologicalPosition(
|
func (d *Database) MaxTopologicalPosition(
|
||||||
ctx context.Context, roomID string,
|
ctx context.Context, roomID string,
|
||||||
) (depth types.StreamPosition, stream types.StreamPosition, err error) {
|
) (types.TopologyToken, error) {
|
||||||
return d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
|
depth, streamPos, err := d.Topology.SelectMaxPositionInTopology(ctx, nil, roomID)
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Database) EventsAtTopologicalPosition(
|
|
||||||
ctx context.Context, roomID string, pos types.StreamPosition,
|
|
||||||
) ([]types.StreamEvent, error) {
|
|
||||||
eIDs, err := d.Topology.SelectEventIDsFromPosition(ctx, nil, roomID, pos)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.NewTopologyToken(0, 0), err
|
||||||
}
|
}
|
||||||
|
return types.NewTopologyToken(depth, streamPos), nil
|
||||||
return d.OutputEvents.SelectEvents(ctx, nil, eIDs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) EventPositionInTopology(
|
func (d *Database) EventPositionInTopology(
|
||||||
ctx context.Context, eventID string,
|
ctx context.Context, eventID string,
|
||||||
) (depth types.StreamPosition, stream types.StreamPosition, err error) {
|
) (types.TopologyToken, error) {
|
||||||
return d.Topology.SelectPositionInTopology(ctx, nil, eventID)
|
depth, stream, err := d.Topology.SelectPositionInTopology(ctx, nil, eventID)
|
||||||
|
if err != nil {
|
||||||
|
return types.NewTopologyToken(0, 0), err
|
||||||
|
}
|
||||||
|
return types.NewTopologyToken(depth, stream), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO FIXME TEMPORARY PUBLIC
|
func (d *Database) syncPositionTx(
|
||||||
func (d *Database) SyncPositionTx(
|
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
) (sp types.StreamingToken, err error) {
|
) (sp types.StreamingToken, err error) {
|
||||||
|
|
||||||
|
@ -466,7 +462,7 @@ func (d *Database) addPDUDeltaToResponse(
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This should be done in getStateDeltas
|
// TODO: This should be done in getStateDeltas
|
||||||
if err = d.AddInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
|
if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -510,8 +506,7 @@ func (d *Database) addTypingDeltaToResponse(
|
||||||
|
|
||||||
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
|
// addEDUDeltaToResponse adds updates for EDUs of each type since fromPos if
|
||||||
// the positions of that type are not equal in fromPos and toPos.
|
// the positions of that type are not equal in fromPos and toPos.
|
||||||
// TODO FIXME TEMPORARY PUBLIC
|
func (d *Database) addEDUDeltaToResponse(
|
||||||
func (d *Database) AddEDUDeltaToResponse(
|
|
||||||
fromPos, toPos types.StreamingToken,
|
fromPos, toPos types.StreamingToken,
|
||||||
joinedRoomIDs []string,
|
joinedRoomIDs []string,
|
||||||
res *types.Response,
|
res *types.Response,
|
||||||
|
@ -551,7 +546,7 @@ func (d *Database) IncrementalSync(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = d.AddEDUDeltaToResponse(
|
err = d.addEDUDeltaToResponse(
|
||||||
fromPos, toPos, joinedRoomIDs, res,
|
fromPos, toPos, joinedRoomIDs, res,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -590,7 +585,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the current sync position which we will base the sync response on.
|
// Get the current sync position which we will base the sync response on.
|
||||||
toPos, err = d.SyncPositionTx(ctx, txn)
|
toPos, err = d.syncPositionTx(ctx, txn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -649,7 +644,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
||||||
res.Rooms.Join[roomID] = *jr
|
res.Rooms.Join[roomID] = *jr
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = d.AddInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
|
if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -668,7 +663,7 @@ func (d *Database) CompleteSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use a zero value SyncPosition for fromPos so all EDU states are added.
|
// Use a zero value SyncPosition for fromPos so all EDU states are added.
|
||||||
err = d.AddEDUDeltaToResponse(
|
err = d.addEDUDeltaToResponse(
|
||||||
types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
|
types.NewStreamToken(0, 0), toPos, joinedRoomIDs, res,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -688,8 +683,7 @@ var txReadOnlySnapshot = sql.TxOptions{
|
||||||
ReadOnly: true,
|
ReadOnly: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO FIXME temporary public
|
func (d *Database) addInvitesToResponse(
|
||||||
func (d *Database) AddInvitesToResponse(
|
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
userID string,
|
userID string,
|
||||||
fromPos, toPos types.StreamPosition,
|
fromPos, toPos types.StreamPosition,
|
||||||
|
|
|
@ -306,9 +306,6 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
|
|
||||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
|
|
||||||
// from sync.
|
|
||||||
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
roomID string, fromPos, toPos types.StreamPosition, limit int,
|
roomID string, fromPos, toPos types.StreamPosition, limit int,
|
||||||
|
@ -341,8 +338,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectEarlyEvents returns the earliest events in the given room, starting
|
|
||||||
// from a given position, up to a maximum of 'limit'.
|
|
||||||
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
roomID string, fromPos, toPos types.StreamPosition, limit int,
|
roomID string, fromPos, toPos types.StreamPosition, limit int,
|
||||||
|
|
|
@ -65,17 +65,12 @@ const selectMaxPositionInTopologySQL = "" +
|
||||||
"SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" +
|
"SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" +
|
||||||
" WHERE room_id = $1 ORDER BY stream_position DESC"
|
" WHERE room_id = $1 ORDER BY stream_position DESC"
|
||||||
|
|
||||||
const selectEventIDsFromPositionSQL = "" +
|
|
||||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
|
||||||
" WHERE room_id = $1 AND topological_position = $2"
|
|
||||||
|
|
||||||
type outputRoomEventsTopologyStatements struct {
|
type outputRoomEventsTopologyStatements struct {
|
||||||
insertEventInTopologyStmt *sql.Stmt
|
insertEventInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsInRangeASCStmt *sql.Stmt
|
selectEventIDsInRangeASCStmt *sql.Stmt
|
||||||
selectEventIDsInRangeDESCStmt *sql.Stmt
|
selectEventIDsInRangeDESCStmt *sql.Stmt
|
||||||
selectPositionInTopologyStmt *sql.Stmt
|
selectPositionInTopologyStmt *sql.Stmt
|
||||||
selectMaxPositionInTopologyStmt *sql.Stmt
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsFromPositionStmt *sql.Stmt
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
|
@ -99,9 +94,6 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
|
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,12 +109,9 @@ func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectEventIDsInRange selects the IDs of events which positions are within a
|
|
||||||
// given range in a given room's topological order.
|
|
||||||
// Returns an empty slice if no events match the given range.
|
|
||||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||||
ctx context.Context, txn *sql.Tx, roomID string,
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
fromPos, toPos, toMicroPos types.StreamPosition,
|
minDepth, maxDepth, maxStreamPos types.StreamPosition,
|
||||||
limit int, chronologicalOrder bool,
|
limit int, chronologicalOrder bool,
|
||||||
) (eventIDs []string, err error) {
|
) (eventIDs []string, err error) {
|
||||||
// Decide on the selection's order according to whether chronological order
|
// Decide on the selection's order according to whether chronological order
|
||||||
|
@ -135,7 +124,7 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query the event IDs.
|
// Query the event IDs.
|
||||||
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit)
|
rows, err := stmt.QueryContext(ctx, roomID, minDepth, maxDepth, maxDepth, maxStreamPos, limit)
|
||||||
if err == sql.ErrNoRows {
|
if err == sql.ErrNoRows {
|
||||||
// If no event matched the request, return an empty slice.
|
// If no event matched the request, return an empty slice.
|
||||||
return []string{}, nil
|
return []string{}, nil
|
||||||
|
@ -172,28 +161,3 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
|
||||||
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectEventIDsFromPosition returns the IDs of all events that have a given
|
|
||||||
// position in the topology of a given room.
|
|
||||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsFromPosition(
|
|
||||||
ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition,
|
|
||||||
) (eventIDs []string, err error) {
|
|
||||||
// Query the event IDs.
|
|
||||||
stmt := common.TxStmt(txn, s.selectEventIDsFromPositionStmt)
|
|
||||||
rows, err := stmt.QueryContext(ctx, roomID, pos)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
// If no event matched the request, return an empty slice.
|
|
||||||
return []string{}, nil
|
|
||||||
} else if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Return the IDs.
|
|
||||||
var eventID string
|
|
||||||
for rows.Next() {
|
|
||||||
if err = rows.Scan(&eventID); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
eventIDs = append(eventIDs, eventID)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
|
@ -292,11 +292,10 @@ func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
|
||||||
db := MustCreateDatabase(t)
|
db := MustCreateDatabase(t)
|
||||||
events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB)
|
events, _ := SimpleRoom(t, testRoomID, testUserIDA, testUserIDB)
|
||||||
MustWriteEvents(t, db, events)
|
MustWriteEvents(t, db, events)
|
||||||
latest, latestStream, err := db.MaxTopologicalPosition(ctx, testRoomID)
|
from, err := db.MaxTopologicalPosition(ctx, testRoomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
|
t.Fatalf("failed to get MaxTopologicalPosition: %s", err)
|
||||||
}
|
}
|
||||||
from := types.NewTopologyToken(latest, latestStream)
|
|
||||||
// head towards the beginning of time
|
// head towards the beginning of time
|
||||||
to := types.NewTopologyToken(0, 0)
|
to := types.NewTopologyToken(0, 0)
|
||||||
|
|
||||||
|
@ -358,16 +357,14 @@ func TestGetEventsInRangeWithEventsSameDepth(t *testing.T) {
|
||||||
Depth: depth + 1,
|
Depth: depth + 1,
|
||||||
}))
|
}))
|
||||||
MustWriteEvents(t, db, events)
|
MustWriteEvents(t, db, events)
|
||||||
latestPos, latestStreamPos, err := db.EventPositionInTopology(ctx, events[len(events)-1].EventID())
|
fromLatest, err := db.EventPositionInTopology(ctx, events[len(events)-1].EventID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to get EventPositionInTopology: %s", err)
|
t.Fatalf("failed to get EventPositionInTopology: %s", err)
|
||||||
}
|
}
|
||||||
topoPos, streamPos, err := db.EventPositionInTopology(ctx, events[len(events)-3].EventID()) // Message 2
|
fromFork, err := db.EventPositionInTopology(ctx, events[len(events)-3].EventID()) // Message 2
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to get EventPositionInTopology for event: %s", err)
|
t.Fatalf("failed to get EventPositionInTopology for event: %s", err)
|
||||||
}
|
}
|
||||||
fromLatest := types.NewTopologyToken(latestPos, latestStreamPos)
|
|
||||||
fromFork := types.NewTopologyToken(topoPos, streamPos)
|
|
||||||
// head towards the beginning of time
|
// head towards the beginning of time
|
||||||
to := types.NewTopologyToken(0, 0)
|
to := types.NewTopologyToken(0, 0)
|
||||||
|
|
||||||
|
@ -507,12 +504,10 @@ func assertEventsEqual(t *testing.T, msg string, checkRoomID bool, gots []gomatr
|
||||||
}
|
}
|
||||||
|
|
||||||
func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *types.TopologyToken {
|
func topologyTokenBefore(t *testing.T, db storage.Database, eventID string) *types.TopologyToken {
|
||||||
pos, spos, err := db.EventPositionInTopology(ctx, eventID)
|
tok, err := db.EventPositionInTopology(ctx, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to get EventPositionInTopology: %s", err)
|
t.Fatalf("failed to get EventPositionInTopology: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tok := types.NewTopologyToken(pos, spos)
|
|
||||||
tok.Decrement()
|
tok.Decrement()
|
||||||
return &tok
|
return &tok
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,13 +11,15 @@ import (
|
||||||
|
|
||||||
type AccountData interface {
|
type AccountData interface {
|
||||||
InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error)
|
InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error)
|
||||||
SelectAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error)
|
// SelectAccountDataInRange returns a map of room ID to a list of `dataType`. The range is exclusive of `lowPos` and inclusive of `hiPos`.
|
||||||
|
SelectAccountDataInRange(ctx context.Context, userID string, lowPos, hiPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error)
|
||||||
SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Invites interface {
|
type Invites interface {
|
||||||
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
|
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
|
||||||
DeleteInviteEvent(ctx context.Context, inviteEventID string) error
|
DeleteInviteEvent(ctx context.Context, inviteEventID string) error
|
||||||
|
// SelectInviteEventsInRange returns a map of room ID to invite events. The range is exclusive of `startPos` and inclusive of `endPos`.
|
||||||
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition) (map[string]gomatrixserverlib.HeaderedEvent, error)
|
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition) (map[string]gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
}
|
}
|
||||||
|
@ -26,26 +28,30 @@ type Events interface {
|
||||||
SelectStateInRange(ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error)
|
SelectStateInRange(ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error)
|
||||||
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
|
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
|
||||||
|
// SelectRecentEvents returns events between the two stream positions: exclusive of `fromPos` and inclusive of `toPos`.
|
||||||
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
||||||
|
// Returns up to `limit` events.
|
||||||
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error)
|
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error)
|
||||||
|
// SelectEarlyEvents returns the earliest events in the given room, exclusive of `fromPos` and inclusive of `toPos`.
|
||||||
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error)
|
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error)
|
||||||
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Topology keeps track of the depths and stream positions for all events.
|
||||||
|
// These positions are used as types.TopologyToken when backfilling events locally.
|
||||||
type Topology interface {
|
type Topology interface {
|
||||||
// InsertEventInTopology inserts the given event in the room's topology, based
|
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
||||||
// on the event's depth.
|
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
||||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
|
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
|
||||||
// SelectEventIDsInRange selects the IDs of events which positions are within a
|
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
||||||
// given range in a given room's topological order.
|
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
||||||
|
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
||||||
// Returns an empty slice if no events match the given range.
|
// Returns an empty slice if no events match the given range.
|
||||||
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
|
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
|
||||||
// SelectPositionInTopology returns the position of a given event in the
|
// SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to.
|
||||||
// topology of the room it belongs to.
|
SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error)
|
||||||
SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos, spos types.StreamPosition, err error)
|
// SelectMaxPositionInTopology returns the event which has the highest depth, and if there are multiple, the event with the highest stream position.
|
||||||
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
|
||||||
// SelectEventIDsFromPosition returns the IDs of all events that have a given
|
|
||||||
// position in the topology of a given room.
|
|
||||||
SelectEventIDsFromPosition(ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition) (eventIDs []string, err error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type CurrentRoomState interface {
|
type CurrentRoomState interface {
|
||||||
|
|
|
@ -119,8 +119,11 @@ func (t *TopologyToken) Decrement() {
|
||||||
depth := t.Positions[0]
|
depth := t.Positions[0]
|
||||||
pduPos := t.Positions[1]
|
pduPos := t.Positions[1]
|
||||||
if depth-1 <= 0 {
|
if depth-1 <= 0 {
|
||||||
|
// nothing can be lower than this
|
||||||
depth = 1
|
depth = 1
|
||||||
} else {
|
} else {
|
||||||
|
// this assumes that we will never have 1000 events all with the same
|
||||||
|
// depth. TODO: work out what the right PDU position is to use, probably needs a db hit.
|
||||||
depth--
|
depth--
|
||||||
pduPos += 1000
|
pduPos += 1000
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue