forked from MirrorHub/synapse
Add two different columns for ordering the events table, one which can be used for pagination and one which can be as tokens for notifying clients. Also add a 'processed' field which is currently always set to True
This commit is contained in:
parent
1c2caacd67
commit
fc26275bb3
4 changed files with 33 additions and 19 deletions
|
@ -63,7 +63,7 @@ class FederationEventHandler(object):
|
||||||
Deferred: Resolved when it has successfully been queued for
|
Deferred: Resolved when it has successfully been queued for
|
||||||
processing.
|
processing.
|
||||||
"""
|
"""
|
||||||
yield self._fill_out_prev_events(event)
|
yield self.fill_out_prev_events(event)
|
||||||
|
|
||||||
pdu = self.pdu_codec.pdu_from_event(event)
|
pdu = self.pdu_codec.pdu_from_event(event)
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ class FederationEventHandler(object):
|
||||||
yield self.event_handler.on_receive(new_state_event)
|
yield self.event_handler.on_receive(new_state_event)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _fill_out_prev_events(self, event):
|
def fill_out_prev_events(self, event):
|
||||||
if hasattr(event, "prev_events"):
|
if hasattr(event, "prev_events"):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
|
@ -43,9 +43,10 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(DataStore, self).__init__(hs)
|
super(DataStore, self).__init__(hs)
|
||||||
self.event_factory = hs.get_event_factory()
|
self.event_factory = hs.get_event_factory()
|
||||||
|
self.hs = hs
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def persist_event(self, event):
|
def persist_event(self, event, backfilled=False):
|
||||||
if event.type == RoomMemberEvent.TYPE:
|
if event.type == RoomMemberEvent.TYPE:
|
||||||
yield self._store_room_member(event)
|
yield self._store_room_member(event)
|
||||||
elif event.type == FeedbackEvent.TYPE:
|
elif event.type == FeedbackEvent.TYPE:
|
||||||
|
@ -57,7 +58,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
elif event.type == RoomTopicEvent.TYPE:
|
elif event.type == RoomTopicEvent.TYPE:
|
||||||
yield self._store_room_topic(event)
|
yield self._store_room_topic(event)
|
||||||
|
|
||||||
ret = yield self._store_event(event)
|
ret = yield self._store_event(event, backfilled)
|
||||||
defer.returnValue(ret)
|
defer.returnValue(ret)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -79,14 +80,23 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
defer.returnValue(event)
|
defer.returnValue(event)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _store_event(self, event):
|
def _store_event(self, event, backfilled):
|
||||||
|
# FIXME (erikj): This should be removed when we start amalgamating
|
||||||
|
# event and pdu storage.
|
||||||
|
yield self.hs.get_federation().fill_out_prev_events(event)
|
||||||
|
|
||||||
vals = {
|
vals = {
|
||||||
|
"topological_ordering": event.depth,
|
||||||
"event_id": event.event_id,
|
"event_id": event.event_id,
|
||||||
"type": event.type,
|
"type": event.type,
|
||||||
"room_id": event.room_id,
|
"room_id": event.room_id,
|
||||||
"content": json.dumps(event.content),
|
"content": json.dumps(event.content),
|
||||||
|
"processed": True,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if backfilled:
|
||||||
|
vals["token_ordering"] = "-1"
|
||||||
|
|
||||||
unrec = {
|
unrec = {
|
||||||
k: v
|
k: v
|
||||||
for k, v in event.get_full_dict().items()
|
for k, v in event.get_full_dict().items()
|
||||||
|
@ -96,7 +106,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
|
|
||||||
yield self._simple_insert("events", vals)
|
yield self._simple_insert("events", vals)
|
||||||
|
|
||||||
if hasattr(event, "state_key"):
|
if not backfilled and hasattr(event, "state_key"):
|
||||||
vals = {
|
vals = {
|
||||||
"event_id": event.event_id,
|
"event_id": event.event_id,
|
||||||
"room_id": event.room_id,
|
"room_id": event.room_id,
|
||||||
|
|
|
@ -14,12 +14,15 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS events(
|
CREATE TABLE IF NOT EXISTS events(
|
||||||
ordering INTEGER PRIMARY KEY AUTOINCREMENT,
|
token_ordering INTEGER AUTOINCREMENT,
|
||||||
|
topological_ordering INTEGER NOT NULL,
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
type TEXT NOT NULL,
|
type TEXT NOT NULL,
|
||||||
room_id TEXT,
|
room_id TEXT NOT NULL,
|
||||||
content TEXT,
|
content TEXT NOT NULL,
|
||||||
unrecognized_keys TEXT
|
unrecognized_keys TEXT,
|
||||||
|
processed BOOL NOT NULL,
|
||||||
|
CONSTRAINT ev_uniq UNIQUE (event_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS state_events(
|
CREATE TABLE IF NOT EXISTS state_events(
|
||||||
|
@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS current_state_events(
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
type TEXT NOT NULL,
|
type TEXT NOT NULL,
|
||||||
state_key TEXT NOT NULL,
|
state_key TEXT NOT NULL,
|
||||||
CONSTRAINT uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
|
CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS room_memberships(
|
CREATE TABLE IF NOT EXISTS room_memberships(
|
||||||
|
|
|
@ -73,13 +73,14 @@ class StreamStore(SQLBaseStore):
|
||||||
# Constraints and ordering depend on direction.
|
# Constraints and ordering depend on direction.
|
||||||
if from_key < to_key:
|
if from_key < to_key:
|
||||||
sql += (
|
sql += (
|
||||||
"AND e.ordering > ? AND e.ordering < ? "
|
"AND e.token_ordering > ? AND e.token_ordering < ? "
|
||||||
"ORDER BY ordering ASC LIMIT %(limit)d "
|
"ORDER BY token_ordering, rowid ASC LIMIT %(limit)d "
|
||||||
) % {"limit": limit}
|
) % {"limit": limit}
|
||||||
else:
|
else:
|
||||||
sql += (
|
sql += (
|
||||||
"AND e.ordering < ? AND e.ordering > ? "
|
"AND e.token_ordering < ? "
|
||||||
"ORDER BY ordering DESC LIMIT %(limit)d "
|
"AND e.token_ordering > ? "
|
||||||
|
"ORDER BY e.token_ordering, rowid DESC LIMIT %(limit)d "
|
||||||
) % {"limit": int(limit)}
|
) % {"limit": int(limit)}
|
||||||
|
|
||||||
rows = yield self._execute_and_decode(
|
rows = yield self._execute_and_decode(
|
||||||
|
@ -91,9 +92,9 @@ class StreamStore(SQLBaseStore):
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
if from_key < to_key:
|
if from_key < to_key:
|
||||||
key = max([r["ordering"] for r in rows])
|
key = max([r["token_ordering"] for r in rows])
|
||||||
else:
|
else:
|
||||||
key = min([r["ordering"] for r in rows])
|
key = min([r["token_ordering"] for r in rows])
|
||||||
else:
|
else:
|
||||||
key = to_key
|
key = to_key
|
||||||
|
|
||||||
|
@ -105,7 +106,7 @@ class StreamStore(SQLBaseStore):
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT * FROM events WHERE room_id = ? "
|
"SELECT * FROM events WHERE room_id = ? "
|
||||||
"ORDER BY ordering DESC LIMIT ? "
|
"ORDER BY token_ordering, rowid DESC LIMIT ? "
|
||||||
)
|
)
|
||||||
|
|
||||||
rows = yield self._execute_and_decode(
|
rows = yield self._execute_and_decode(
|
||||||
|
@ -120,7 +121,7 @@ class StreamStore(SQLBaseStore):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_events_max_id(self):
|
def get_room_events_max_id(self):
|
||||||
res = yield self._execute_and_decode(
|
res = yield self._execute_and_decode(
|
||||||
"SELECT MAX(ordering) as m FROM events"
|
"SELECT MAX(token_ordering) as m FROM events"
|
||||||
)
|
)
|
||||||
|
|
||||||
if not res:
|
if not res:
|
||||||
|
|
Loading…
Reference in a new issue