Handle RRs which arrive before their events

This commit is contained in:
Richard van der Hoff 2018-06-01 14:01:43 +01:00
parent 857e6fd8b6
commit 9f797a24a4

View file

@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.api.errors import NotFoundError
from ._base import SQLBaseStore from ._base import SQLBaseStore
from .util.id_generators import StreamIdGenerator from .util.id_generators import StreamIdGenerator
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
@ -340,32 +340,26 @@ class ReceiptsStore(ReceiptsWorkerStore):
allow_none=True allow_none=True
) )
if not res: stream_ordering = int(res["stream_ordering"]) if res else None
raise NotFoundError(
"Cannot set read receipt on unknown event %s" % (
event_id,
),
)
stream_ordering = int(res["stream_ordering"])
# We don't want to clobber receipts for more recent events, so we # We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts # have to compare orderings of existing receipts
sql = ( if stream_ordering is not None:
"SELECT stream_ordering, event_id FROM events" sql = (
" INNER JOIN receipts_linearized as r USING (event_id, room_id)" "SELECT stream_ordering, event_id FROM events"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
) " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
txn.execute(sql, (room_id, receipt_type, user_id)) )
txn.execute(sql, (room_id, receipt_type, user_id))
for so, eid in txn: for so, eid in txn:
if int(so) >= stream_ordering: if int(so) >= stream_ordering:
logger.debug( logger.debug(
"Ignoring new receipt for %s in favour of existing " "Ignoring new receipt for %s in favour of existing "
"one for later event %s", "one for later event %s",
event_id, eid, event_id, eid,
) )
return False return False
txn.call_after( txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type) self.get_receipts_for_room.invalidate, (room_id, receipt_type)
@ -413,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
} }
) )
if receipt_type == "m.read": if receipt_type == "m.read" and stream_ordering is not None:
self._remove_old_push_actions_before_txn( self._remove_old_push_actions_before_txn(
txn, txn,
room_id=room_id, room_id=room_id,