forked from MirrorHub/synapse
Use event IDs instead of dumping event content in the txns table.
This commit is contained in:
parent
21fd84dcb8
commit
b98cd03193
3 changed files with 26 additions and 20 deletions
|
@ -442,10 +442,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
||||||
new_txn_id = max(highest_txn_id, last_txn_id) + 1
|
new_txn_id = max(highest_txn_id, last_txn_id) + 1
|
||||||
|
|
||||||
# Insert new txn into txn table
|
# Insert new txn into txn table
|
||||||
|
event_ids = [e.event_id for e in events]
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"INSERT INTO application_services_txns(as_id, txn_id, content) "
|
"INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
|
||||||
"VALUES(?,?,?)",
|
"VALUES(?,?,?)",
|
||||||
(service.id, new_txn_id, encode_canonical_json(events))
|
(service.id, new_txn_id, json.dumps(event_ids))
|
||||||
)
|
)
|
||||||
return AppServiceTransaction(
|
return AppServiceTransaction(
|
||||||
service=service, id=new_txn_id, events=events
|
service=service, id=new_txn_id, events=events
|
||||||
|
@ -491,7 +492,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
||||||
dict(last_txn=txn_id)
|
dict(last_txn=txn_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Delete txn contents
|
# Delete txn
|
||||||
self._simple_delete_txn(
|
self._simple_delete_txn(
|
||||||
txn, "application_services_txns",
|
txn, "application_services_txns",
|
||||||
dict(txn_id=txn_id, as_id=service.id)
|
dict(txn_id=txn_id, as_id=service.id)
|
||||||
|
@ -526,10 +527,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
|
||||||
# the min(txn_id) part will force a row, so entry may not be None
|
# the min(txn_id) part will force a row, so entry may not be None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
event_ids = json.loads(entry["event_ids"])
|
||||||
|
events = self._get_events_txn(event_ids)
|
||||||
|
|
||||||
return AppServiceTransaction(
|
return AppServiceTransaction(
|
||||||
service=service, id=entry["txn_id"], events=json.loads(
|
service=service, id=entry["txn_id"], events=events
|
||||||
entry["content"]
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_last_txn(self, txn, service_id):
|
def _get_last_txn(self, txn, service_id):
|
||||||
|
|
|
@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS application_services_state(
|
||||||
CREATE TABLE IF NOT EXISTS application_services_txns(
|
CREATE TABLE IF NOT EXISTS application_services_txns(
|
||||||
as_id INTEGER NOT NULL,
|
as_id INTEGER NOT NULL,
|
||||||
txn_id INTEGER NOT NULL,
|
txn_id INTEGER NOT NULL,
|
||||||
content TEXT NOT NULL,
|
event_ids TEXT NOT NULL,
|
||||||
UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
|
UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -167,11 +167,11 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
(id, state, txn)
|
(id, state, txn)
|
||||||
)
|
)
|
||||||
|
|
||||||
def _insert_txn(self, as_id, txn_id, content):
|
def _insert_txn(self, as_id, txn_id, events):
|
||||||
return self.db_pool.runQuery(
|
return self.db_pool.runQuery(
|
||||||
"INSERT INTO application_services_txns(as_id, txn_id, content) "
|
"INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
|
||||||
"VALUES(?,?,?)",
|
"VALUES(?,?,?)",
|
||||||
(as_id, txn_id, json.dumps(content))
|
(as_id, txn_id, json.dumps([e.event_id for e in events]))
|
||||||
)
|
)
|
||||||
|
|
||||||
def _set_last_txn(self, as_id, txn_id):
|
def _set_last_txn(self, as_id, txn_id):
|
||||||
|
@ -255,7 +255,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_create_appservice_txn_first(self):
|
def test_create_appservice_txn_first(self):
|
||||||
service = Mock(id=self.as_list[0]["id"])
|
service = Mock(id=self.as_list[0]["id"])
|
||||||
events = [{"type": "nothing"}, {"type": "here"}]
|
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||||
txn = yield self.store.create_appservice_txn(service, events)
|
txn = yield self.store.create_appservice_txn(service, events)
|
||||||
self.assertEquals(txn.id, 1)
|
self.assertEquals(txn.id, 1)
|
||||||
self.assertEquals(txn.events, events)
|
self.assertEquals(txn.events, events)
|
||||||
|
@ -264,7 +264,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_create_appservice_txn_older_last_txn(self):
|
def test_create_appservice_txn_older_last_txn(self):
|
||||||
service = Mock(id=self.as_list[0]["id"])
|
service = Mock(id=self.as_list[0]["id"])
|
||||||
events = [{"type": "nothing"}, {"type": "here"}]
|
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||||
yield self._set_last_txn(service.id, 9643) # AS is falling behind
|
yield self._set_last_txn(service.id, 9643) # AS is falling behind
|
||||||
yield self._insert_txn(service.id, 9644, events)
|
yield self._insert_txn(service.id, 9644, events)
|
||||||
yield self._insert_txn(service.id, 9645, events)
|
yield self._insert_txn(service.id, 9645, events)
|
||||||
|
@ -276,7 +276,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_create_appservice_txn_up_to_date_last_txn(self):
|
def test_create_appservice_txn_up_to_date_last_txn(self):
|
||||||
service = Mock(id=self.as_list[0]["id"])
|
service = Mock(id=self.as_list[0]["id"])
|
||||||
events = [{"type": "nothing"}, {"type": "here"}]
|
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||||
yield self._set_last_txn(service.id, 9643)
|
yield self._set_last_txn(service.id, 9643)
|
||||||
txn = yield self.store.create_appservice_txn(service, events)
|
txn = yield self.store.create_appservice_txn(service, events)
|
||||||
self.assertEquals(txn.id, 9644)
|
self.assertEquals(txn.id, 9644)
|
||||||
|
@ -286,7 +286,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_create_appservice_txn_up_fuzzing(self):
|
def test_create_appservice_txn_up_fuzzing(self):
|
||||||
service = Mock(id=self.as_list[0]["id"])
|
service = Mock(id=self.as_list[0]["id"])
|
||||||
events = [{"type": "nothing"}, {"type": "here"}]
|
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||||
yield self._set_last_txn(service.id, 9643)
|
yield self._set_last_txn(service.id, 9643)
|
||||||
|
|
||||||
# dump in rows with higher IDs to make sure the queries aren't wrong.
|
# dump in rows with higher IDs to make sure the queries aren't wrong.
|
||||||
|
@ -307,7 +307,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_complete_appservice_txn_first_txn(self):
|
def test_complete_appservice_txn_first_txn(self):
|
||||||
service = Mock(id=self.as_list[0]["id"])
|
service = Mock(id=self.as_list[0]["id"])
|
||||||
events = [{"foo": "bar"}]
|
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||||
txn_id = 1
|
txn_id = 1
|
||||||
|
|
||||||
yield self._insert_txn(service.id, txn_id, events)
|
yield self._insert_txn(service.id, txn_id, events)
|
||||||
|
@ -329,7 +329,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_complete_appservice_txn_existing_in_state_table(self):
|
def test_complete_appservice_txn_existing_in_state_table(self):
|
||||||
service = Mock(id=self.as_list[0]["id"])
|
service = Mock(id=self.as_list[0]["id"])
|
||||||
events = [{"foo": "bar"}]
|
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||||
txn_id = 5
|
txn_id = 5
|
||||||
yield self._set_last_txn(service.id, 4)
|
yield self._set_last_txn(service.id, 4)
|
||||||
yield self._insert_txn(service.id, txn_id, events)
|
yield self._insert_txn(service.id, txn_id, events)
|
||||||
|
@ -360,12 +360,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_get_oldest_unsent_txn(self):
|
def test_get_oldest_unsent_txn(self):
|
||||||
service = Mock(id=self.as_list[0]["id"])
|
service = Mock(id=self.as_list[0]["id"])
|
||||||
events = [{"type": "nothing"}, {"type": "here"}]
|
events = [Mock(event_id="e1"), Mock(event_id="e2")]
|
||||||
|
other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
|
||||||
|
|
||||||
yield self._insert_txn(self.as_list[1]["id"], 9, {"badger": "mushroom"})
|
# we aren't testing store._base stuff here, so mock this out
|
||||||
|
self.store._get_events_txn = Mock(return_value=events)
|
||||||
|
|
||||||
|
yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
|
||||||
yield self._insert_txn(service.id, 10, events)
|
yield self._insert_txn(service.id, 10, events)
|
||||||
yield self._insert_txn(service.id, 11, [{"foo":"bar"}])
|
yield self._insert_txn(service.id, 11, other_events)
|
||||||
yield self._insert_txn(service.id, 12, [{"argh":"bargh"}])
|
yield self._insert_txn(service.id, 12, other_events)
|
||||||
|
|
||||||
txn = yield self.store.get_oldest_unsent_txn(service)
|
txn = yield self.store.get_oldest_unsent_txn(service)
|
||||||
self.assertEquals(service, txn.service)
|
self.assertEquals(service, txn.service)
|
||||||
|
|
Loading…
Reference in a new issue