diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 44b45a890..284371073 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -77,9 +77,8 @@ func (n *Notifier) OnNewEvent( // This needs to be done PRIOR to waking up users as they will read this value. n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos + n.currPos.ApplyUpdates(posUpdate) n.removeEmptyUserStreams() if ev != nil { @@ -113,11 +112,11 @@ func (n *Notifier) OnNewEvent( } } - n.wakeupUsers(usersToNotify, peekingDevicesToNotify, latestPos) + n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos) } else if roomID != "" { - n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), latestPos) + n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos) } else if len(userIDs) > 0 { - n.wakeupUsers(userIDs, nil, latestPos) + n.wakeupUsers(userIDs, nil, n.currPos) } else { log.WithFields(log.Fields{ "posUpdate": posUpdate.String, @@ -155,10 +154,9 @@ func (n *Notifier) OnNewSendToDevice( ) { n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos - n.wakeupUserDevice(userID, deviceIDs, latestPos) + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUserDevice(userID, deviceIDs, n.currPos) } // OnNewReceipt updates the current position @@ -168,10 +166,9 @@ func (n *Notifier) OnNewTyping( ) { n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos - n.wakeupUsers(n.joinedUsers(roomID), nil, latestPos) + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) } // OnNewReceipt updates the current position @@ -181,10 +178,9 @@ func (n *Notifier) OnNewReceipt( ) { n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos - n.wakeupUsers(n.joinedUsers(roomID), nil, latestPos) + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) } func (n *Notifier) OnNewKeyChange( @@ -192,9 +188,9 @@ func (n *Notifier) OnNewKeyChange( ) { n.streamLock.Lock() defer n.streamLock.Unlock() - latestPos := n.currPos.WithUpdates(posUpdate) - n.currPos = latestPos - n.wakeupUsers([]string{wakeUserID}, nil, latestPos) + + n.currPos.ApplyUpdates(posUpdate) + n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) } // GetListener returns a UserStreamListener that can be used to wait for diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 1102f2e16..a2fe05e06 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -294,8 +294,8 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea // Add the updates into the sync response. for _, event := range events { res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent) + res.NextBatch.SendToDevicePosition++ } - res.NextBatch.SendToDevicePosition++ } return res, err