diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 43e2ce8ba..ba90706e8 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -156,11 +156,13 @@ func (p *PDUStreamProvider) IncrementalSync( if req.WantFullState { if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed") + _ = snapshot.Rollback() return } } else { if stateDeltas, syncJoinedRooms, err = snapshot.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") + _ = snapshot.Rollback() return } } @@ -175,6 +177,7 @@ func (p *PDUStreamProvider) IncrementalSync( if err = p.addIgnoredUsersToFilter(ctx, snapshot, req, &eventFilter); err != nil { req.Log.WithError(err).Error("unable to update event filter with ignored users") + _ = snapshot.Rollback() } newPos = from @@ -194,7 +197,11 @@ func (p *PDUStreamProvider) IncrementalSync( var pos types.StreamPosition if pos, err = p.addRoomDeltaToResponse(ctx, snapshot, req.Device, newRange, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - return to + _ = snapshot.Rollback() + if err == context.DeadlineExceeded || err == context.Canceled { + return newPos + } + continue // return to } // Reset the position, as it is only for the special case of newly joined rooms if delta.NewlyJoined {