From 3d6df846580ce6ef8769945e6990af2f44251e40 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 13:59:55 +0100 Subject: [PATCH 1/2] Test and fix support for cancellation in Linearizer --- synapse/util/async.py | 28 ++++++++++++++++++++++------ tests/util/test_linearizer.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/synapse/util/async.py b/synapse/util/async.py index 5a50d9700..a7094e2fb 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -184,13 +184,13 @@ class Linearizer(object): # key_to_defer is a map from the key to a 2 element list where # the first element is the number of things executing, and - # the second element is a deque of deferreds for the things blocked from - # executing. + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -198,12 +198,28 @@ class Linearizer(object): # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() - entry[1].append(new_defer) + entry[1][new_defer] = 1 logger.info( "Waiting to acquire linearizer lock %r for key %r", self.name, key, ) - yield make_deferred_yieldable(new_defer) + try: + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 @@ -238,7 +254,7 @@ class Linearizer(object): entry[0] -= 1 if entry[1]: - next_def = entry[1].popleft() + (next_def, _) = entry[1].popitem(last=False) # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c9563124f..4729bd5a0 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -17,6 +17,7 @@ from six.moves import range from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError from synapse.util import Clock, logcontext from synapse.util.async import Linearizer @@ -112,3 +113,33 @@ class LinearizerTestCase(unittest.TestCase): d6 = limiter.queue(key) with (yield d6): pass + + @defer.inlineCallbacks + def test_cancellation(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + d3 = linearizer.queue(key) + self.assertFalse(d3.called) + + d2.cancel() + + with cm1: + pass + + self.assertTrue(d2.called) + try: + yield d2 + self.fail("Expected d2 to raise CancelledError") + except CancelledError: + pass + + with (yield d3): + pass From 5c30cb709af1295b2285d802d1e40b91382e7af0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 14:01:36 +0100 Subject: [PATCH 2/2] Changelog --- changelog.d/3572.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3572.misc diff --git a/changelog.d/3572.misc b/changelog.d/3572.misc new file mode 100644 index 000000000..8908324e6 --- /dev/null +++ b/changelog.d/3572.misc @@ -0,0 +1 @@ +Merge Linearizer and Limiter