From 142934084a374c3e47b63939526827f5afa7410d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 May 2015 15:40:21 +0100 Subject: [PATCH] Count and loop --- synapse/storage/_base.py | 2 +- synapse/storage/events.py | 64 +++++++++++++++++++-------------------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c20ff3a57..97bf42469 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,7 +301,7 @@ class SQLBaseStore(object): self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] - self._event_fetch_ongoing = False + self._event_fetch_ongoing = 0 self.database_engine = hs.database_engine diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0859518b1..a6b2e7677 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,41 +506,39 @@ class EventsStore(SQLBaseStore): def do_fetch(txn): event_list = [] - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] + while True: + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] - if not event_list: - return + if not event_list: + return - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - rows = self._fetch_event_rows(txn, event_ids) + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + rows = self._fetch_event_rows(txn, event_ids) - row_dict = { - r["event_id"]: r - for r in rows - } + row_dict = { + r["event_id"]: r + for r in rows + } - for ids, d in event_list: - d.callback( - [ - row_dict[i] for i in ids - if i in row_dict - ] - ) - except Exception as e: - for _, d in event_list: - try: - reactor.callFromThread(d.errback, e) - except: - pass - finally: - with self._event_fetch_lock: - self._event_fetch_ongoing = False + for ids, d in event_list: + d.callback( + [ + row_dict[i] for i in ids + if i in row_dict + ] + ) + except Exception as e: + for _, d in event_list: + try: + reactor.callFromThread(d.errback, e) + except: + pass def cb(rows): return defer.gatherResults([ @@ -561,12 +559,12 @@ class EventsStore(SQLBaseStore): (events, d) ) - if not self._event_fetch_ongoing: + if self._event_fetch_ongoing < 3: + self._event_fetch_ongoing += 1 self.runInteraction( "do_fetch", do_fetch ) - self._event_fetch_ongoing = True res = yield d