forked from MirrorHub/synapse
Merge branch 'release-v0.18.6' into develop
This commit is contained in:
commit
899a3a1268
5 changed files with 25 additions and 70 deletions
10
CHANGES.rst
10
CHANGES.rst
|
@ -1,3 +1,13 @@
|
|||
Changes in synapse v0.18.6-rc2 (2016-12-30)
|
||||
===========================================
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Fix memory leak in twisted by initialising logging correctly (PR #1731)
|
||||
* Fix bug where fetching missing events took an unacceptable amount of time in
|
||||
large rooms (PR #1734)
|
||||
|
||||
|
||||
Changes in synapse v0.18.6-rc1 (2016-12-29)
|
||||
===========================================
|
||||
|
||||
|
|
|
@ -16,4 +16,4 @@
|
|||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.18.6-rc1"
|
||||
__version__ = "0.18.6-rc2"
|
||||
|
|
|
@ -27,7 +27,6 @@ from synapse.util.caches.expiringcache import ExpiringCache
|
|||
from synapse.util.logutils import log_function
|
||||
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.types import get_domain_from_id
|
||||
import synapse.metrics
|
||||
|
||||
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
|
||||
|
@ -741,8 +740,6 @@ class FederationClient(FederationBase):
|
|||
signed_events = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, events, outlier=False
|
||||
)
|
||||
|
||||
have_gotten_all_from_destination = True
|
||||
except HttpResponseException as e:
|
||||
if not e.code == 400:
|
||||
raise
|
||||
|
@ -750,72 +747,6 @@ class FederationClient(FederationBase):
|
|||
# We are probably hitting an old server that doesn't support
|
||||
# get_missing_events
|
||||
signed_events = []
|
||||
have_gotten_all_from_destination = False
|
||||
|
||||
if len(signed_events) >= limit:
|
||||
defer.returnValue(signed_events)
|
||||
|
||||
users = yield self.state.get_current_user_in_room(room_id)
|
||||
servers = set(get_domain_from_id(u) for u in users)
|
||||
|
||||
servers = set(servers)
|
||||
servers.discard(self.server_name)
|
||||
|
||||
failed_to_fetch = set()
|
||||
|
||||
while len(signed_events) < limit:
|
||||
# Are we missing any?
|
||||
|
||||
seen_events = set(earliest_events_ids)
|
||||
seen_events.update(e.event_id for e in signed_events if e)
|
||||
|
||||
missing_events = {}
|
||||
for e in itertools.chain(latest_events, signed_events):
|
||||
if e.depth > min_depth:
|
||||
missing_events.update({
|
||||
e_id: e.depth for e_id, _ in e.prev_events
|
||||
if e_id not in seen_events
|
||||
and e_id not in failed_to_fetch
|
||||
})
|
||||
|
||||
if not missing_events:
|
||||
break
|
||||
|
||||
have_seen = yield self.store.have_events(missing_events)
|
||||
|
||||
for k in have_seen:
|
||||
missing_events.pop(k, None)
|
||||
|
||||
if not missing_events:
|
||||
break
|
||||
|
||||
# Okay, we haven't gotten everything yet. Lets get them.
|
||||
ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
|
||||
|
||||
if have_gotten_all_from_destination:
|
||||
servers.discard(destination)
|
||||
|
||||
def random_server_list():
|
||||
srvs = list(servers)
|
||||
random.shuffle(srvs)
|
||||
return srvs
|
||||
|
||||
deferreds = [
|
||||
preserve_fn(self.get_pdu)(
|
||||
destinations=random_server_list(),
|
||||
event_id=e_id,
|
||||
)
|
||||
for e_id, depth in ordered_missing[:limit - len(signed_events)]
|
||||
]
|
||||
|
||||
res = yield preserve_context_over_deferred(
|
||||
defer.DeferredList(deferreds, consumeErrors=True)
|
||||
)
|
||||
for (result, val), (e_id, _) in zip(res, ordered_missing):
|
||||
if result and val:
|
||||
signed_events.append(val)
|
||||
else:
|
||||
failed_to_fetch.add(e_id)
|
||||
|
||||
defer.returnValue(signed_events)
|
||||
|
||||
|
|
|
@ -538,7 +538,16 @@ class FederationServer(FederationBase):
|
|||
if get_missing and prevs - seen:
|
||||
# If we're missing stuff, ensure we only fetch stuff one
|
||||
# at a time.
|
||||
logger.info(
|
||||
"Acquiring lock for room %r to fetch %d missing events: %r...",
|
||||
pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
|
||||
)
|
||||
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
|
||||
logger.info(
|
||||
"Acquired lock for room %r to fetch %d missing events",
|
||||
pdu.room_id, len(prevs - seen),
|
||||
)
|
||||
|
||||
# We recalculate seen, since it may have changed.
|
||||
have_seen = yield self.store.have_events(prevs)
|
||||
seen = set(have_seen.keys())
|
||||
|
|
|
@ -23,6 +23,10 @@ from synapse.util import unwrapFirstError
|
|||
|
||||
from contextlib import contextmanager
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def sleep(seconds):
|
||||
|
@ -181,6 +185,7 @@ class Linearizer(object):
|
|||
self.key_to_defer[key] = new_defer
|
||||
|
||||
if current_defer:
|
||||
logger.info("Waiting to acquire linearizer lock for key %r", key)
|
||||
with PreserveLoggingContext():
|
||||
yield current_defer
|
||||
|
||||
|
|
Loading…
Reference in a new issue