0
0
Fork 0
mirror of https://github.com/matrix-org/dendrite synced 2024-11-19 00:00:55 +01:00

Fix positions, add ApplyUpdates

This commit is contained in:
Neil Alexander 2020-12-11 10:42:55 +00:00
parent 9eff386e21
commit 0634d9fed1
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 26 additions and 19 deletions

View file

@ -134,7 +134,7 @@ func DeviceListCatchup(
Partition: queryRes.Partition, Partition: queryRes.Partition,
Offset: queryRes.Offset, Offset: queryRes.Offset,
}) })
res.NextBatch = res.NextBatch.WithUpdates(to) res.NextBatch.ApplyUpdates(to)
return hasNew, nil return hasNew, nil
} }

View file

@ -575,6 +575,7 @@ func (d *Database) addTypingDeltaToResponse(
jr = *types.NewJoinResponse() jr = *types.NewJoinResponse()
} }
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
res.NextBatch.TypingPosition++
res.Rooms.Join[roomID] = jr res.Rooms.Join[roomID] = jr
} }
} }
@ -630,6 +631,7 @@ func (d *Database) addReceiptDeltaToResponse(
} }
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
res.NextBatch.ReceiptPosition++
res.Rooms.Join[roomID] = jr res.Rooms.Join[roomID] = jr
} }
@ -686,7 +688,7 @@ func (d *Database) IncrementalSync(
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
wantFullState bool, wantFullState bool,
) (*types.Response, error) { ) (*types.Response, error) {
res.NextBatch = res.NextBatch.WithUpdates(toPos) res.NextBatch = fromPos.WithUpdates(toPos)
var joinedRoomIDs []string var joinedRoomIDs []string
var err error var err error
@ -778,7 +780,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
To: toPos.PDUPosition, To: toPos.PDUPosition,
} }
res.NextBatch = res.NextBatch.WithUpdates(toPos) res.NextBatch.ApplyUpdates(toPos)
// Extract room state and recent events for all rooms the user is joined to. // Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)

View file

@ -427,7 +427,7 @@ func (rp *RequestPool) appendAccountData(
// or timeout=0, or full_state=true, in any of the cases the request should // or timeout=0, or full_state=true, in any of the cases the request should
// return immediately. // return immediately.
func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool { func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
if syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState { if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState {
return true return true
} }
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID) waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)

View file

@ -187,28 +187,33 @@ func (t *StreamingToken) IsEmpty() bool {
// If the latter StreamingToken contains a field that is not 0, it is considered an update, // If the latter StreamingToken contains a field that is not 0, it is considered an update,
// and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called. // and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called.
// If the other token has a log, they will replace any existing log on this token. // If the other token has a log, they will replace any existing log on this token.
func (t *StreamingToken) WithUpdates(other StreamingToken) (ret StreamingToken) { func (t *StreamingToken) WithUpdates(other StreamingToken) StreamingToken {
ret = *t ret := *t
ret.ApplyUpdates(other)
return ret
}
// ApplyUpdates applies any changes from the supplied StreamingToken. If the supplied
// streaming token contains any positions that are not 0, they are considered updates
// and will overwrite the value in the token.
func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
switch { switch {
case other.PDUPosition > 0: case other.PDUPosition > 0:
ret.PDUPosition = other.PDUPosition t.PDUPosition = other.PDUPosition
case other.TypingPosition > 0: case other.TypingPosition > 0:
ret.TypingPosition = other.TypingPosition t.TypingPosition = other.TypingPosition
case other.ReceiptPosition > 0: case other.ReceiptPosition > 0:
ret.ReceiptPosition = other.ReceiptPosition t.ReceiptPosition = other.ReceiptPosition
case other.SendToDevicePosition > 0: case other.SendToDevicePosition > 0:
ret.SendToDevicePosition = other.SendToDevicePosition t.SendToDevicePosition = other.SendToDevicePosition
} }
ret.Logs = make(map[string]*LogPosition) if t.Logs == nil {
for name := range t.Logs { t.Logs = make(map[string]*LogPosition)
otherLog := other.Log(name) }
if otherLog == nil { for name, pos := range other.Logs {
continue copy := *pos
} t.Logs[name] = &copy
copy := *otherLog
ret.Logs[name] = &copy
} }
return ret
} }
type TopologyToken struct { type TopologyToken struct {