forked from MirrorHub/synapse
Merge pull request #2532 from matrix-org/rav/fix_linearizer
Fix stackoverflow and logcontexts from linearizer
This commit is contained in:
commit
cc794d60e7
2 changed files with 46 additions and 6 deletions
|
@ -203,7 +203,26 @@ class Linearizer(object):
|
||||||
except:
|
except:
|
||||||
logger.exception("Unexpected exception in Linearizer")
|
logger.exception("Unexpected exception in Linearizer")
|
||||||
|
|
||||||
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
|
logger.info("Acquired linearizer lock %r for key %r", self.name,
|
||||||
|
key)
|
||||||
|
|
||||||
|
# if the code holding the lock completes synchronously, then it
|
||||||
|
# will recursively run the next claimant on the list. That can
|
||||||
|
# relatively rapidly lead to stack exhaustion. This is essentially
|
||||||
|
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
|
||||||
|
#
|
||||||
|
# In order to break the cycle, we add a cheeky sleep(0) here to
|
||||||
|
# ensure that we fall back to the reactor between each iteration.
|
||||||
|
#
|
||||||
|
# (There's no particular need for it to happen before we return
|
||||||
|
# the context manager, but it needs to happen while we hold the
|
||||||
|
# lock, and the context manager's exit code must be synchronous,
|
||||||
|
# so actually this is the only sensible place.
|
||||||
|
yield run_on_reactor()
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.info("Acquired uncontended linearizer lock %r for key %r",
|
||||||
|
self.name, key)
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def _ctx_manager():
|
def _ctx_manager():
|
||||||
|
@ -211,7 +230,8 @@ class Linearizer(object):
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
|
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
|
||||||
new_defer.callback(None)
|
with PreserveLoggingContext():
|
||||||
|
new_defer.callback(None)
|
||||||
current_d = self.key_to_defer.get(key)
|
current_d = self.key_to_defer.get(key)
|
||||||
if current_d is new_defer:
|
if current_d is new_defer:
|
||||||
self.key_to_defer.pop(key, None)
|
self.key_to_defer.pop(key, None)
|
||||||
|
|
|
@ -12,8 +12,7 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
from synapse.util import async, logcontext
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
@ -38,7 +37,28 @@ class LinearizerTestCase(unittest.TestCase):
|
||||||
with cm1:
|
with cm1:
|
||||||
self.assertFalse(d2.called)
|
self.assertFalse(d2.called)
|
||||||
|
|
||||||
self.assertTrue(d2.called)
|
|
||||||
|
|
||||||
with (yield d2):
|
with (yield d2):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def test_lots_of_queued_things(self):
|
||||||
|
# we have one slow thing, and lots of fast things queued up behind it.
|
||||||
|
# it should *not* explode the stack.
|
||||||
|
linearizer = Linearizer()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def func(i, sleep=False):
|
||||||
|
with logcontext.LoggingContext("func(%s)" % i) as lc:
|
||||||
|
with (yield linearizer.queue("")):
|
||||||
|
self.assertEqual(
|
||||||
|
logcontext.LoggingContext.current_context(), lc)
|
||||||
|
if sleep:
|
||||||
|
yield async.sleep(0)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
logcontext.LoggingContext.current_context(), lc)
|
||||||
|
|
||||||
|
func(0, sleep=True)
|
||||||
|
for i in xrange(1, 100):
|
||||||
|
func(i)
|
||||||
|
|
||||||
|
return func(1000)
|
||||||
|
|
Loading…
Reference in a new issue