mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 05:33:53 +01:00
Merge pull request #3244 from NotAFile/py3-six-4
replace some iteritems with six
This commit is contained in:
commit
1f69693347
15 changed files with 69 additions and 49 deletions
|
@ -26,6 +26,8 @@ from ._base import BaseHandler
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from six import itervalues, iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -318,7 +320,7 @@ class DeviceHandler(BaseHandler):
|
||||||
# The user may have left the room
|
# The user may have left the room
|
||||||
# TODO: Check if they actually did or if we were just invited.
|
# TODO: Check if they actually did or if we were just invited.
|
||||||
if room_id not in room_ids:
|
if room_id not in room_ids:
|
||||||
for key, event_id in current_state_ids.iteritems():
|
for key, event_id in iteritems(current_state_ids):
|
||||||
etype, state_key = key
|
etype, state_key = key
|
||||||
if etype != EventTypes.Member:
|
if etype != EventTypes.Member:
|
||||||
continue
|
continue
|
||||||
|
@ -338,7 +340,7 @@ class DeviceHandler(BaseHandler):
|
||||||
# special-case for an empty prev state: include all members
|
# special-case for an empty prev state: include all members
|
||||||
# in the changed list
|
# in the changed list
|
||||||
if not event_ids:
|
if not event_ids:
|
||||||
for key, event_id in current_state_ids.iteritems():
|
for key, event_id in iteritems(current_state_ids):
|
||||||
etype, state_key = key
|
etype, state_key = key
|
||||||
if etype != EventTypes.Member:
|
if etype != EventTypes.Member:
|
||||||
continue
|
continue
|
||||||
|
@ -354,10 +356,10 @@ class DeviceHandler(BaseHandler):
|
||||||
|
|
||||||
# Check if we've joined the room? If so we just blindly add all the users to
|
# Check if we've joined the room? If so we just blindly add all the users to
|
||||||
# the "possibly changed" users.
|
# the "possibly changed" users.
|
||||||
for state_dict in prev_state_ids.itervalues():
|
for state_dict in itervalues(prev_state_ids):
|
||||||
member_event = state_dict.get((EventTypes.Member, user_id), None)
|
member_event = state_dict.get((EventTypes.Member, user_id), None)
|
||||||
if not member_event or member_event != current_member_id:
|
if not member_event or member_event != current_member_id:
|
||||||
for key, event_id in current_state_ids.iteritems():
|
for key, event_id in iteritems(current_state_ids):
|
||||||
etype, state_key = key
|
etype, state_key = key
|
||||||
if etype != EventTypes.Member:
|
if etype != EventTypes.Member:
|
||||||
continue
|
continue
|
||||||
|
@ -367,14 +369,14 @@ class DeviceHandler(BaseHandler):
|
||||||
# If there has been any change in membership, include them in the
|
# If there has been any change in membership, include them in the
|
||||||
# possibly changed list. We'll check if they are joined below,
|
# possibly changed list. We'll check if they are joined below,
|
||||||
# and we're not toooo worried about spuriously adding users.
|
# and we're not toooo worried about spuriously adding users.
|
||||||
for key, event_id in current_state_ids.iteritems():
|
for key, event_id in iteritems(current_state_ids):
|
||||||
etype, state_key = key
|
etype, state_key = key
|
||||||
if etype != EventTypes.Member:
|
if etype != EventTypes.Member:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# check if this member has changed since any of the extremities
|
# check if this member has changed since any of the extremities
|
||||||
# at the stream_ordering, and add them to the list if so.
|
# at the stream_ordering, and add them to the list if so.
|
||||||
for state_dict in prev_state_ids.itervalues():
|
for state_dict in itervalues(prev_state_ids):
|
||||||
prev_event_id = state_dict.get(key, None)
|
prev_event_id = state_dict.get(key, None)
|
||||||
if not prev_event_id or prev_event_id != event_id:
|
if not prev_event_id or prev_event_id != event_id:
|
||||||
if state_key != user_id:
|
if state_key != user_id:
|
||||||
|
|
|
@ -19,6 +19,7 @@ import logging
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
SynapseError, CodeMessageException, FederationDeniedError,
|
SynapseError, CodeMessageException, FederationDeniedError,
|
||||||
|
@ -92,7 +93,7 @@ class E2eKeysHandler(object):
|
||||||
remote_queries_not_in_cache = {}
|
remote_queries_not_in_cache = {}
|
||||||
if remote_queries:
|
if remote_queries:
|
||||||
query_list = []
|
query_list = []
|
||||||
for user_id, device_ids in remote_queries.iteritems():
|
for user_id, device_ids in iteritems(remote_queries):
|
||||||
if device_ids:
|
if device_ids:
|
||||||
query_list.extend((user_id, device_id) for device_id in device_ids)
|
query_list.extend((user_id, device_id) for device_id in device_ids)
|
||||||
else:
|
else:
|
||||||
|
@ -103,9 +104,9 @@ class E2eKeysHandler(object):
|
||||||
query_list
|
query_list
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
for user_id, devices in remote_results.iteritems():
|
for user_id, devices in iteritems(remote_results):
|
||||||
user_devices = results.setdefault(user_id, {})
|
user_devices = results.setdefault(user_id, {})
|
||||||
for device_id, device in devices.iteritems():
|
for device_id, device in iteritems(devices):
|
||||||
keys = device.get("keys", None)
|
keys = device.get("keys", None)
|
||||||
device_display_name = device.get("device_display_name", None)
|
device_display_name = device.get("device_display_name", None)
|
||||||
if keys:
|
if keys:
|
||||||
|
@ -250,9 +251,9 @@ class E2eKeysHandler(object):
|
||||||
"Claimed one-time-keys: %s",
|
"Claimed one-time-keys: %s",
|
||||||
",".join((
|
",".join((
|
||||||
"%s for %s:%s" % (key_id, user_id, device_id)
|
"%s for %s:%s" % (key_id, user_id, device_id)
|
||||||
for user_id, user_keys in json_result.iteritems()
|
for user_id, user_keys in iteritems(json_result)
|
||||||
for device_id, device_keys in user_keys.iteritems()
|
for device_id, device_keys in iteritems(user_keys)
|
||||||
for key_id, _ in device_keys.iteritems()
|
for key_id, _ in iteritems(device_keys)
|
||||||
)),
|
)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
|
@ -449,7 +450,7 @@ class GroupsLocalHandler(object):
|
||||||
|
|
||||||
results = {}
|
results = {}
|
||||||
failed_results = []
|
failed_results = []
|
||||||
for destination, dest_user_ids in destinations.iteritems():
|
for destination, dest_user_ids in iteritems(destinations):
|
||||||
try:
|
try:
|
||||||
r = yield self.transport_client.bulk_get_publicised_groups(
|
r = yield self.transport_client.bulk_get_publicised_groups(
|
||||||
destination, list(dest_user_ids),
|
destination, list(dest_user_ids),
|
||||||
|
|
|
@ -25,6 +25,8 @@ The methods that define policy are:
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
from six import itervalues, iteritems
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.api.constants import PresenceState
|
from synapse.api.constants import PresenceState
|
||||||
from synapse.storage.presence import UserPresenceState
|
from synapse.storage.presence import UserPresenceState
|
||||||
|
@ -40,7 +42,6 @@ import synapse.metrics
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
@ -530,7 +531,7 @@ class PresenceHandler(object):
|
||||||
prev_state.copy_and_replace(
|
prev_state.copy_and_replace(
|
||||||
last_user_sync_ts=time_now_ms,
|
last_user_sync_ts=time_now_ms,
|
||||||
)
|
)
|
||||||
for prev_state in prev_states.itervalues()
|
for prev_state in itervalues(prev_states)
|
||||||
])
|
])
|
||||||
self.external_process_last_updated_ms.pop(process_id, None)
|
self.external_process_last_updated_ms.pop(process_id, None)
|
||||||
|
|
||||||
|
@ -553,14 +554,14 @@ class PresenceHandler(object):
|
||||||
for user_id in user_ids
|
for user_id in user_ids
|
||||||
}
|
}
|
||||||
|
|
||||||
missing = [user_id for user_id, state in states.iteritems() if not state]
|
missing = [user_id for user_id, state in iteritems(states) if not state]
|
||||||
if missing:
|
if missing:
|
||||||
# There are things not in our in memory cache. Lets pull them out of
|
# There are things not in our in memory cache. Lets pull them out of
|
||||||
# the database.
|
# the database.
|
||||||
res = yield self.store.get_presence_for_users(missing)
|
res = yield self.store.get_presence_for_users(missing)
|
||||||
states.update(res)
|
states.update(res)
|
||||||
|
|
||||||
missing = [user_id for user_id, state in states.iteritems() if not state]
|
missing = [user_id for user_id, state in iteritems(states) if not state]
|
||||||
if missing:
|
if missing:
|
||||||
new = {
|
new = {
|
||||||
user_id: UserPresenceState.default(user_id)
|
user_id: UserPresenceState.default(user_id)
|
||||||
|
@ -1048,7 +1049,7 @@ class PresenceEventSource(object):
|
||||||
defer.returnValue((updates.values(), max_token))
|
defer.returnValue((updates.values(), max_token))
|
||||||
else:
|
else:
|
||||||
defer.returnValue(([
|
defer.returnValue(([
|
||||||
s for s in updates.itervalues()
|
s for s in itervalues(updates)
|
||||||
if s.state != PresenceState.OFFLINE
|
if s.state != PresenceState.OFFLINE
|
||||||
], max_token))
|
], max_token))
|
||||||
|
|
||||||
|
@ -1305,11 +1306,11 @@ def get_interested_remotes(store, states, state_handler):
|
||||||
# hosts in those rooms.
|
# hosts in those rooms.
|
||||||
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
|
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
|
||||||
|
|
||||||
for room_id, states in room_ids_to_states.iteritems():
|
for room_id, states in iteritems(room_ids_to_states):
|
||||||
hosts = yield state_handler.get_current_hosts_in_room(room_id)
|
hosts = yield state_handler.get_current_hosts_in_room(room_id)
|
||||||
hosts_and_states.append((hosts, states))
|
hosts_and_states.append((hosts, states))
|
||||||
|
|
||||||
for user_id, states in users_to_states.iteritems():
|
for user_id, states in iteritems(users_to_states):
|
||||||
host = get_domain_from_id(user_id)
|
host = get_domain_from_id(user_id)
|
||||||
hosts_and_states.append(([host], states))
|
hosts_and_states.append(([host], states))
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,8 @@ import collections
|
||||||
import logging
|
import logging
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
|
from six import itervalues, iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -275,7 +277,7 @@ class SyncHandler(object):
|
||||||
# result returned by the event source is poor form (it might cache
|
# result returned by the event source is poor form (it might cache
|
||||||
# the object)
|
# the object)
|
||||||
room_id = event["room_id"]
|
room_id = event["room_id"]
|
||||||
event_copy = {k: v for (k, v) in event.iteritems()
|
event_copy = {k: v for (k, v) in iteritems(event)
|
||||||
if k != "room_id"}
|
if k != "room_id"}
|
||||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||||
|
|
||||||
|
@ -294,7 +296,7 @@ class SyncHandler(object):
|
||||||
for event in receipts:
|
for event in receipts:
|
||||||
room_id = event["room_id"]
|
room_id = event["room_id"]
|
||||||
# exclude room id, as above
|
# exclude room id, as above
|
||||||
event_copy = {k: v for (k, v) in event.iteritems()
|
event_copy = {k: v for (k, v) in iteritems(event)
|
||||||
if k != "room_id"}
|
if k != "room_id"}
|
||||||
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
|
||||||
|
|
||||||
|
@ -325,7 +327,7 @@ class SyncHandler(object):
|
||||||
current_state_ids = frozenset()
|
current_state_ids = frozenset()
|
||||||
if any(e.is_state() for e in recents):
|
if any(e.is_state() for e in recents):
|
||||||
current_state_ids = yield self.state.get_current_state_ids(room_id)
|
current_state_ids = yield self.state.get_current_state_ids(room_id)
|
||||||
current_state_ids = frozenset(current_state_ids.itervalues())
|
current_state_ids = frozenset(itervalues(current_state_ids))
|
||||||
|
|
||||||
recents = yield filter_events_for_client(
|
recents = yield filter_events_for_client(
|
||||||
self.store,
|
self.store,
|
||||||
|
@ -382,7 +384,7 @@ class SyncHandler(object):
|
||||||
current_state_ids = frozenset()
|
current_state_ids = frozenset()
|
||||||
if any(e.is_state() for e in loaded_recents):
|
if any(e.is_state() for e in loaded_recents):
|
||||||
current_state_ids = yield self.state.get_current_state_ids(room_id)
|
current_state_ids = yield self.state.get_current_state_ids(room_id)
|
||||||
current_state_ids = frozenset(current_state_ids.itervalues())
|
current_state_ids = frozenset(itervalues(current_state_ids))
|
||||||
|
|
||||||
loaded_recents = yield filter_events_for_client(
|
loaded_recents = yield filter_events_for_client(
|
||||||
self.store,
|
self.store,
|
||||||
|
@ -984,7 +986,7 @@ class SyncHandler(object):
|
||||||
if since_token:
|
if since_token:
|
||||||
for joined_sync in sync_result_builder.joined:
|
for joined_sync in sync_result_builder.joined:
|
||||||
it = itertools.chain(
|
it = itertools.chain(
|
||||||
joined_sync.timeline.events, joined_sync.state.itervalues()
|
joined_sync.timeline.events, itervalues(joined_sync.state)
|
||||||
)
|
)
|
||||||
for event in it:
|
for event in it:
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
|
@ -1062,7 +1064,7 @@ class SyncHandler(object):
|
||||||
newly_left_rooms = []
|
newly_left_rooms = []
|
||||||
room_entries = []
|
room_entries = []
|
||||||
invited = []
|
invited = []
|
||||||
for room_id, events in mem_change_events_by_room_id.iteritems():
|
for room_id, events in iteritems(mem_change_events_by_room_id):
|
||||||
non_joins = [e for e in events if e.membership != Membership.JOIN]
|
non_joins = [e for e in events if e.membership != Membership.JOIN]
|
||||||
has_join = len(non_joins) != len(events)
|
has_join = len(non_joins) != len(events)
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ from synapse.util.metrics import Measure
|
||||||
from synapse.util.async import sleep
|
from synapse.util.async import sleep
|
||||||
from synapse.types import get_localpart_from_id
|
from synapse.types import get_localpart_from_id
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -410,7 +411,7 @@ class UserDirectoryHandler(object):
|
||||||
|
|
||||||
if change:
|
if change:
|
||||||
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
||||||
for user_id, profile in users_with_profile.iteritems():
|
for user_id, profile in iteritems(users_with_profile):
|
||||||
yield self._handle_new_user(room_id, user_id, profile)
|
yield self._handle_new_user(room_id, user_id, profile)
|
||||||
else:
|
else:
|
||||||
users = yield self.store.get_users_in_public_due_to_room(room_id)
|
users = yield self.store.get_users_in_public_due_to_room(room_id)
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
TICKS_PER_SEC = 100
|
TICKS_PER_SEC = 100
|
||||||
BYTES_PER_PAGE = 4096
|
BYTES_PER_PAGE = 4096
|
||||||
|
@ -55,7 +56,7 @@ def update_resource_metrics():
|
||||||
# line is PID (command) more stats go here ...
|
# line is PID (command) more stats go here ...
|
||||||
raw_stats = line.split(") ", 1)[1].split(" ")
|
raw_stats = line.split(") ", 1)[1].split(" ")
|
||||||
|
|
||||||
for (name, index) in STAT_FIELDS.iteritems():
|
for (name, index) in iteritems(STAT_FIELDS):
|
||||||
# subtract 3 from the index, because proc(5) is 1-based, and
|
# subtract 3 from the index, because proc(5) is 1-based, and
|
||||||
# we've lost the first two fields in PID and COMMAND above
|
# we've lost the first two fields in PID and COMMAND above
|
||||||
stats[name] = int(raw_stats[index - 3])
|
stats[name] = int(raw_stats[index - 3])
|
||||||
|
|
|
@ -30,6 +30,7 @@ from synapse.state import POWER_KEY
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
|
||||||
|
from six import itervalues, iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -126,7 +127,7 @@ class BulkPushRuleEvaluator(object):
|
||||||
)
|
)
|
||||||
auth_events = yield self.store.get_events(auth_events_ids)
|
auth_events = yield self.store.get_events(auth_events_ids)
|
||||||
auth_events = {
|
auth_events = {
|
||||||
(e.type, e.state_key): e for e in auth_events.itervalues()
|
(e.type, e.state_key): e for e in itervalues(auth_events)
|
||||||
}
|
}
|
||||||
|
|
||||||
sender_level = get_user_power_level(event.sender, auth_events)
|
sender_level = get_user_power_level(event.sender, auth_events)
|
||||||
|
@ -160,7 +161,7 @@ class BulkPushRuleEvaluator(object):
|
||||||
|
|
||||||
condition_cache = {}
|
condition_cache = {}
|
||||||
|
|
||||||
for uid, rules in rules_by_user.iteritems():
|
for uid, rules in iteritems(rules_by_user):
|
||||||
if event.sender == uid:
|
if event.sender == uid:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -406,7 +407,7 @@ class RulesForRoom(object):
|
||||||
# If the event is a join event then it will be in current state evnts
|
# If the event is a join event then it will be in current state evnts
|
||||||
# map but not in the DB, so we have to explicitly insert it.
|
# map but not in the DB, so we have to explicitly insert it.
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
for event_id in member_event_ids.itervalues():
|
for event_id in itervalues(member_event_ids):
|
||||||
if event_id == event.event_id:
|
if event_id == event.event_id:
|
||||||
members[event_id] = (event.state_key, event.membership)
|
members[event_id] = (event.state_key, event.membership)
|
||||||
|
|
||||||
|
@ -414,7 +415,7 @@ class RulesForRoom(object):
|
||||||
logger.debug("Found members %r: %r", self.room_id, members.values())
|
logger.debug("Found members %r: %r", self.room_id, members.values())
|
||||||
|
|
||||||
interested_in_user_ids = set(
|
interested_in_user_ids = set(
|
||||||
user_id for user_id, membership in members.itervalues()
|
user_id for user_id, membership in itervalues(members)
|
||||||
if membership == Membership.JOIN
|
if membership == Membership.JOIN
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -426,7 +427,7 @@ class RulesForRoom(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
user_ids = set(
|
user_ids = set(
|
||||||
uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher
|
uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("With pushers: %r", user_ids)
|
logger.debug("With pushers: %r", user_ids)
|
||||||
|
@ -447,7 +448,7 @@ class RulesForRoom(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
ret_rules_by_user.update(
|
ret_rules_by_user.update(
|
||||||
item for item in rules_by_user.iteritems() if item[0] is not None
|
item for item in iteritems(rules_by_user) if item[0] is not None
|
||||||
)
|
)
|
||||||
|
|
||||||
self.update_cache(sequence, members, ret_rules_by_user, state_group)
|
self.update_cache(sequence, members, ret_rules_by_user, state_group)
|
||||||
|
|
|
@ -68,6 +68,7 @@ import synapse.metrics
|
||||||
import struct
|
import struct
|
||||||
import fcntl
|
import fcntl
|
||||||
|
|
||||||
|
from six import iterkeys, iteritems
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
|
@ -392,7 +393,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
|
|
||||||
if stream_name == "ALL":
|
if stream_name == "ALL":
|
||||||
# Subscribe to all streams we're publishing to.
|
# Subscribe to all streams we're publishing to.
|
||||||
for stream in self.streamer.streams_by_name.iterkeys():
|
for stream in iterkeys(self.streamer.streams_by_name):
|
||||||
self.subscribe_to_stream(stream, token)
|
self.subscribe_to_stream(stream, token)
|
||||||
else:
|
else:
|
||||||
self.subscribe_to_stream(stream_name, token)
|
self.subscribe_to_stream(stream_name, token)
|
||||||
|
@ -498,7 +499,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
BaseReplicationStreamProtocol.connectionMade(self)
|
BaseReplicationStreamProtocol.connectionMade(self)
|
||||||
|
|
||||||
# Once we've connected subscribe to the necessary streams
|
# Once we've connected subscribe to the necessary streams
|
||||||
for stream_name, token in self.handler.get_streams_to_replicate().iteritems():
|
for stream_name, token in iteritems(self.handler.get_streams_to_replicate()):
|
||||||
self.replicate(stream_name, token)
|
self.replicate(stream_name, token)
|
||||||
|
|
||||||
# Tell the server if we have any users currently syncing (should only
|
# Tell the server if we have any users currently syncing (should only
|
||||||
|
@ -633,7 +634,7 @@ metrics.register_callback(
|
||||||
lambda: {
|
lambda: {
|
||||||
(k[0], p.name, p.conn_id): count
|
(k[0], p.name, p.conn_id): count
|
||||||
for p in connected_connections
|
for p in connected_connections
|
||||||
for k, count in p.inbound_commands_counter.counts.iteritems()
|
for k, count in iteritems(p.inbound_commands_counter.counts)
|
||||||
},
|
},
|
||||||
labels=["command", "name", "conn_id"],
|
labels=["command", "name", "conn_id"],
|
||||||
)
|
)
|
||||||
|
@ -643,7 +644,7 @@ metrics.register_callback(
|
||||||
lambda: {
|
lambda: {
|
||||||
(k[0], p.name, p.conn_id): count
|
(k[0], p.name, p.conn_id): count
|
||||||
for p in connected_connections
|
for p in connected_connections
|
||||||
for k, count in p.outbound_commands_counter.counts.iteritems()
|
for k, count in iteritems(p.outbound_commands_counter.counts)
|
||||||
},
|
},
|
||||||
labels=["command", "name", "conn_id"],
|
labels=["command", "name", "conn_id"],
|
||||||
)
|
)
|
||||||
|
|
|
@ -26,6 +26,7 @@ from synapse.util.metrics import Measure, measure_func
|
||||||
import logging
|
import logging
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
|
from six import itervalues
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
stream_updates_counter = metrics.register_counter(
|
stream_updates_counter = metrics.register_counter(
|
||||||
|
@ -80,7 +81,7 @@ class ReplicationStreamer(object):
|
||||||
# We only support federation stream if federation sending hase been
|
# We only support federation stream if federation sending hase been
|
||||||
# disabled on the master.
|
# disabled on the master.
|
||||||
self.streams = [
|
self.streams = [
|
||||||
stream(hs) for stream in STREAMS_MAP.itervalues()
|
stream(hs) for stream in itervalues(STREAMS_MAP)
|
||||||
if stream != FederationStream or not hs.config.send_federation
|
if stream != FederationStream or not hs.config.send_federation
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,7 @@ import shutil
|
||||||
import cgi
|
import cgi
|
||||||
import logging
|
import logging
|
||||||
from six.moves.urllib import parse as urlparse
|
from six.moves.urllib import parse as urlparse
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -603,7 +604,7 @@ class MediaRepository(object):
|
||||||
thumbnails[(t_width, t_height, r_type)] = r_method
|
thumbnails[(t_width, t_height, r_type)] = r_method
|
||||||
|
|
||||||
# Now we generate the thumbnails for each dimension, store it
|
# Now we generate the thumbnails for each dimension, store it
|
||||||
for (t_width, t_height, t_type), t_method in thumbnails.iteritems():
|
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
|
||||||
# Generate the thumbnail
|
# Generate the thumbnail
|
||||||
if t_method == "crop":
|
if t_method == "crop":
|
||||||
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
|
||||||
|
|
|
@ -22,6 +22,8 @@ from . import background_updates
|
||||||
|
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||||
def _update_client_ips_batch_txn(self, txn, to_update):
|
def _update_client_ips_batch_txn(self, txn, to_update):
|
||||||
self.database_engine.lock_table(txn, "user_ips")
|
self.database_engine.lock_table(txn, "user_ips")
|
||||||
|
|
||||||
for entry in to_update.iteritems():
|
for entry in iteritems(to_update):
|
||||||
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
|
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
|
||||||
|
|
||||||
self._simple_upsert_txn(
|
self._simple_upsert_txn(
|
||||||
|
@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||||
"user_agent": user_agent,
|
"user_agent": user_agent,
|
||||||
"last_seen": last_seen,
|
"last_seen": last_seen,
|
||||||
}
|
}
|
||||||
for (access_token, ip), (user_agent, last_seen) in results.iteritems()
|
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
|
||||||
))
|
))
|
||||||
|
|
|
@ -21,6 +21,7 @@ from synapse.api.errors import StoreError
|
||||||
from ._base import SQLBaseStore, Cache
|
from ._base import SQLBaseStore, Cache
|
||||||
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
|
||||||
|
|
||||||
|
from six import itervalues, iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore):
|
||||||
return (now_stream_id, [])
|
return (now_stream_id, [])
|
||||||
|
|
||||||
if len(query_map) >= 20:
|
if len(query_map) >= 20:
|
||||||
now_stream_id = max(stream_id for stream_id in query_map.itervalues())
|
now_stream_id = max(stream_id for stream_id in itervalues(query_map))
|
||||||
|
|
||||||
devices = self._get_e2e_device_keys_txn(
|
devices = self._get_e2e_device_keys_txn(
|
||||||
txn, query_map.keys(), include_all_devices=True
|
txn, query_map.keys(), include_all_devices=True
|
||||||
|
@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
results = []
|
results = []
|
||||||
for user_id, user_devices in devices.iteritems():
|
for user_id, user_devices in iteritems(devices):
|
||||||
# The prev_id for the first row is always the last row before
|
# The prev_id for the first row is always the last row before
|
||||||
# `from_stream_id`
|
# `from_stream_id`
|
||||||
txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
|
txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
|
||||||
rows = txn.fetchall()
|
rows = txn.fetchall()
|
||||||
prev_id = rows[0][0]
|
prev_id = rows[0][0]
|
||||||
for device_id, device in user_devices.iteritems():
|
for device_id, device in iteritems(user_devices):
|
||||||
stream_id = query_map[(user_id, device_id)]
|
stream_id = query_map[(user_id, device_id)]
|
||||||
result = {
|
result = {
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
|
@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore):
|
||||||
if devices:
|
if devices:
|
||||||
user_devices = devices[user_id]
|
user_devices = devices[user_id]
|
||||||
results = []
|
results = []
|
||||||
for device_id, device in user_devices.iteritems():
|
for device_id, device in iteritems(user_devices):
|
||||||
result = {
|
result = {
|
||||||
"device_id": device_id,
|
"device_id": device_id,
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,8 @@ import simplejson as json
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
|
|
||||||
class EndToEndKeyStore(SQLBaseStore):
|
class EndToEndKeyStore(SQLBaseStore):
|
||||||
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
|
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
|
||||||
|
@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore):
|
||||||
query_list, include_all_devices,
|
query_list, include_all_devices,
|
||||||
)
|
)
|
||||||
|
|
||||||
for user_id, device_keys in results.iteritems():
|
for user_id, device_keys in iteritems(results):
|
||||||
for device_id, device_info in device_keys.iteritems():
|
for device_id, device_info in iteritems(device_keys):
|
||||||
device_info["keys"] = json.loads(device_info.pop("key_json"))
|
device_info["keys"] = json.loads(device_info.pop("key_json"))
|
||||||
|
|
||||||
defer.returnValue(results)
|
defer.returnValue(results)
|
||||||
|
|
|
@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||||
import logging
|
import logging
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
txn.executemany(sql, (
|
txn.executemany(sql, (
|
||||||
_gen_entry(user_id, actions)
|
_gen_entry(user_id, actions)
|
||||||
for user_id, actions in user_id_actions.iteritems()
|
for user_id, actions in iteritems(user_id_actions)
|
||||||
))
|
))
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
|
|
Loading…
Reference in a new issue