# # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2016 OpenMarket Ltd # Copyright (C) 2023 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # See the GNU Affero General Public License for more details: # . # # Originally licensed under the Apache License, Version 2.0: # . # # [This file includes modifications made by New Vector Limited] # # from typing import Hashable, Tuple from typing_extensions import Protocol from twisted.internet import defer, reactor from twisted.internet.base import ReactorBase from twisted.internet.defer import CancelledError, Deferred from synapse.logging.context import LoggingContext, current_context from synapse.util.async_helpers import Linearizer from tests import unittest class UnblockFunction(Protocol): def __call__(self, pump_reactor: bool = True) -> None: ... class LinearizerTestCase(unittest.TestCase): def _start_task( self, linearizer: Linearizer, key: Hashable ) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]: """Starts a task which acquires the linearizer lock, blocks, then completes. Args: linearizer: The `Linearizer`. key: The `Linearizer` key. Returns: A tuple containing: * A cancellable `Deferred` for the entire task. * A `Deferred` that resolves once the task acquires the lock. * A function that unblocks the task. Must be called by the caller to allow the task to release the lock and complete. """ acquired_d: "Deferred[None]" = Deferred() unblock_d: "Deferred[None]" = Deferred() async def task() -> None: async with linearizer.queue(key): acquired_d.callback(None) await unblock_d d = defer.ensureDeferred(task()) def unblock(pump_reactor: bool = True) -> None: unblock_d.callback(None) # The next task, if it exists, will acquire the lock and require a kick of # the reactor to advance. if pump_reactor: self._pump() return d, acquired_d, unblock def _pump(self) -> None: """Pump the reactor to advance `Linearizer`s.""" assert isinstance(reactor, ReactorBase) while reactor.getDelayedCalls(): reactor.runUntilCurrent() def test_linearizer(self) -> None: """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() key = object() _, acquired_d1, unblock1 = self._start_task(linearizer, key) self.assertTrue(acquired_d1.called) _, acquired_d2, unblock2 = self._start_task(linearizer, key) self.assertFalse(acquired_d2.called) # Once the first task is done, the second task can continue. unblock1() self.assertTrue(acquired_d2.called) unblock2() def test_linearizer_is_queued(self) -> None: """Tests `Linearizer.is_queued`. Runs through the same scenario as `test_linearizer`. """ linearizer = Linearizer() key = object() _, acquired_d1, unblock1 = self._start_task(linearizer, key) self.assertTrue(acquired_d1.called) # Since the first task acquires the lock immediately, "is_queued" should return # false. self.assertFalse(linearizer.is_queued(key)) _, acquired_d2, unblock2 = self._start_task(linearizer, key) self.assertFalse(acquired_d2.called) # Now the second task is queued up behind the first. self.assertTrue(linearizer.is_queued(key)) unblock1() # And now the second task acquires the lock and nothing is in the queue again. self.assertTrue(acquired_d2.called) self.assertFalse(linearizer.is_queued(key)) unblock2() self.assertFalse(linearizer.is_queued(key)) def test_lots_of_queued_things(self) -> None: """Tests lots of fast things queued up behind a slow thing. The stack should *not* explode when the slow thing completes. """ linearizer = Linearizer() key = "" async def func(i: int) -> None: with LoggingContext("func(%s)" % i) as lc: async with linearizer.queue(key): self.assertEqual(current_context(), lc) self.assertEqual(current_context(), lc) _, _, unblock = self._start_task(linearizer, key) for i in range(1, 100): defer.ensureDeferred(func(i)) d = defer.ensureDeferred(func(1000)) unblock() self.successResultOf(d) def test_multiple_entries(self) -> None: """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) key = object() _, acquired_d1, unblock1 = self._start_task(limiter, key) self.assertTrue(acquired_d1.called) _, acquired_d2, unblock2 = self._start_task(limiter, key) self.assertTrue(acquired_d2.called) _, acquired_d3, unblock3 = self._start_task(limiter, key) self.assertTrue(acquired_d3.called) # These next two tasks have to wait. _, acquired_d4, unblock4 = self._start_task(limiter, key) self.assertFalse(acquired_d4.called) _, acquired_d5, unblock5 = self._start_task(limiter, key) self.assertFalse(acquired_d5.called) # Once the first task completes, the fourth task can continue. unblock1() self.assertTrue(acquired_d4.called) self.assertFalse(acquired_d5.called) # Once the third task completes, the fifth task can continue. unblock3() self.assertTrue(acquired_d5.called) # Make all tasks finish. unblock2() unblock4() unblock5() # The next task shouldn't have to wait. _, acquired_d6, unblock6 = self._start_task(limiter, key) self.assertTrue(acquired_d6) unblock6() def test_cancellation(self) -> None: """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() key = object() d1, acquired_d1, unblock1 = self._start_task(linearizer, key) self.assertTrue(acquired_d1.called) # Create a second task, waiting for the first task. d2, acquired_d2, _ = self._start_task(linearizer, key) self.assertFalse(acquired_d2.called) # Create a third task, waiting for the second task. d3, acquired_d3, unblock3 = self._start_task(linearizer, key) self.assertFalse(acquired_d3.called) # Cancel the waiting second task. d2.cancel() unblock1() self.successResultOf(d1) self.assertTrue(d2.called) self.failureResultOf(d2, CancelledError) # The third task should continue running. self.assertTrue( acquired_d3.called, "Third task did not get the lock after the second task was cancelled", ) unblock3() self.successResultOf(d3) def test_cancellation_during_sleep(self) -> None: """Tests cancellation during the sleep just after waiting for a `Linearizer`.""" linearizer = Linearizer() key = object() d1, acquired_d1, unblock1 = self._start_task(linearizer, key) self.assertTrue(acquired_d1.called) # Create a second task, waiting for the first task. d2, acquired_d2, _ = self._start_task(linearizer, key) self.assertFalse(acquired_d2.called) # Create a third task, waiting for the second task. d3, acquired_d3, unblock3 = self._start_task(linearizer, key) self.assertFalse(acquired_d3.called) # Once the first task completes, cancel the waiting second task while it is # sleeping just after acquiring the lock. unblock1(pump_reactor=False) self.successResultOf(d1) d2.cancel() self._pump() self.assertTrue(d2.called) self.failureResultOf(d2, CancelledError) # The third task should continue running. self.assertTrue( acquired_d3.called, "Third task did not get the lock after the second task was cancelled", ) unblock3() self.successResultOf(d3)