Merge branch 'master' of github.com:matrix-org/synapse into develop

This commit is contained in:
Erik Johnston 2015-05-22 10:33:00 +01:00
commit 4429e720ae
11 changed files with 94 additions and 37 deletions

View file

@ -1,3 +1,9 @@
Changes in synapse v0.9.0-r5 (2015-05-21)
=========================================
* Add more database caches to reduce amount of work done for each pusher. This
radically reduces CPU usage when multiple pushers are set up in the same room.
Changes in synapse v0.9.0 (2015-05-07)
======================================

View file

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.9.0-r4"
__version__ = "0.9.0-r5"

View file

@ -536,9 +536,7 @@ class RoomListHandler(BaseHandler):
chunk = yield self.store.get_rooms(is_public=True)
results = yield defer.gatherResults(
[
self.store.get_users_in_room(
room_id=room["room_id"],
)
self.store.get_users_in_room(room["room_id"])
for room in chunk
],
consumeErrors=True,

View file

@ -84,25 +84,20 @@ class Pusher(object):
rules = baserules.list_with_base_rules(rawrules, user)
room_id = ev['room_id']
# get *our* member event for display name matching
member_events_for_room = yield self.store.get_current_state(
room_id=ev['room_id'],
event_type='m.room.member',
state_key=None
)
my_display_name = None
room_member_count = 0
for mev in member_events_for_room:
if mev.content['membership'] != 'join':
continue
our_member_event = yield self.store.get_current_state(
room_id=room_id,
event_type='m.room.member',
state_key=self.user_name,
)
if our_member_event:
my_display_name = our_member_event[0].content.get("displayname")
# This loop does two things:
# 1) Find our current display name
if mev.state_key == self.user_name and 'displayname' in mev.content:
my_display_name = mev.content['displayname']
# and 2) Get the number of people in that room
room_member_count += 1
room_members = yield self.store.get_users_in_room(room_id)
room_member_count = len(room_members)
for r in rules:
if r['rule_id'] in enabled_map:
@ -287,9 +282,11 @@ class Pusher(object):
if len(actions) == 0:
logger.warn("Empty actions! Using default action.")
actions = Pusher.DEFAULT_ACTIONS
if 'notify' not in actions and 'dont_notify' not in actions:
logger.warn("Neither notify nor dont_notify in actions: adding default")
actions.extend(Pusher.DEFAULT_ACTIONS)
if 'dont_notify' in actions:
logger.debug(
"%s for %s: dont_notify",

View file

@ -121,6 +121,11 @@ class Cache(object):
self.sequence += 1
self.cache.pop(keyargs, None)
def invalidate_all(self):
self.check_thread()
self.sequence += 1
self.cache.clear()
def cached(max_entries=1000, num_args=1, lru=False):
""" A method decorator that applies a memoizing cache around the function.
@ -172,6 +177,7 @@ def cached(max_entries=1000, num_args=1, lru=False):
defer.returnValue(ret)
wrapped.invalidate = cache.invalidate
wrapped.invalidate_all = cache.invalidate_all
wrapped.prefill = cache.prefill
return wrapped

View file

@ -125,6 +125,12 @@ class EventsStore(SQLBaseStore):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.call_after(self.get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, event.room_id)
txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
txn.call_after(self.get_room_name_and_aliases, event.room_id)
self._simple_delete_txn(
txn,
table="current_state_events",
@ -132,13 +138,6 @@ class EventsStore(SQLBaseStore):
)
for s in current_state:
if s.type == EventTypes.Member:
txn.call_after(
self.get_rooms_for_user.invalidate, s.state_key
)
txn.call_after(
self.get_joined_hosts_for_room.invalidate, s.room_id
)
self._simple_insert_txn(
txn,
"current_state_events",
@ -356,6 +355,18 @@ class EventsStore(SQLBaseStore):
)
if is_new_state and not context.rejected:
txn.call_after(
self.get_current_state_for_key.invalidate,
event.room_id, event.type, event.state_key
)
if (event.type == EventTypes.Name
or event.type == EventTypes.Aliases):
txn.call_after(
self.get_room_name_and_aliases.invalidate,
event.room_id
)
self._simple_upsert_txn(
txn,
"current_state_events",

View file

@ -13,9 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from ._base import SQLBaseStore, Table
from ._base import SQLBaseStore, cached
from twisted.internet import defer
import logging
@ -41,6 +39,7 @@ class PushRuleStore(SQLBaseStore):
defer.returnValue(rows)
@cached()
@defer.inlineCallbacks
def get_push_rules_enabled_for_user(self, user_name):
results = yield self._simple_select_list(
@ -151,6 +150,10 @@ class PushRuleStore(SQLBaseStore):
txn.execute(sql, (user_name, priority_class, new_rule_priority))
txn.call_after(
self.get_push_rules_enabled_for_user.invalidate, user_name
)
self._simple_insert_txn(
txn,
table=PushRuleTable.table_name,
@ -179,6 +182,10 @@ class PushRuleStore(SQLBaseStore):
new_rule['priority_class'] = priority_class
new_rule['priority'] = new_prio
txn.call_after(
self.get_push_rules_enabled_for_user.invalidate, user_name
)
self._simple_insert_txn(
txn,
table=PushRuleTable.table_name,
@ -201,6 +208,7 @@ class PushRuleStore(SQLBaseStore):
{'user_name': user_name, 'rule_id': rule_id},
desc="delete_push_rule",
)
self.get_push_rules_enabled_for_user.invalidate(user_name)
@defer.inlineCallbacks
def set_push_rule_enabled(self, user_name, rule_id, enabled):
@ -220,6 +228,7 @@ class PushRuleStore(SQLBaseStore):
{'enabled': 1 if enabled else 0},
{'id': new_id},
)
self.get_push_rules_enabled_for_user.invalidate(user_name)
class RuleNotFoundException(Exception):
@ -230,7 +239,7 @@ class InconsistentRuleException(Exception):
pass
class PushRuleTable(Table):
class PushRuleTable(object):
table_name = "push_rules"
fields = [
@ -243,10 +252,8 @@ class PushRuleTable(Table):
"actions",
]
EntryType = collections.namedtuple("PushRuleEntry", fields)
class PushRuleEnableTable(Table):
class PushRuleEnableTable(object):
table_name = "push_rules_enable"
fields = [

View file

@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from ._base import SQLBaseStore
from ._base import SQLBaseStore, cached
import collections
import logging
@ -186,6 +186,7 @@ class RoomStore(SQLBaseStore):
}
)
@cached()
@defer.inlineCallbacks
def get_room_name_and_aliases(self, room_id):
def f(txn):

View file

@ -66,6 +66,7 @@ class RoomMemberStore(SQLBaseStore):
txn.call_after(self.get_rooms_for_user.invalidate, target_user_id)
txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
txn.call_after(self.get_users_in_room.invalidate, event.room_id)
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
@ -87,6 +88,7 @@ class RoomMemberStore(SQLBaseStore):
lambda events: events[0] if events else None
)
@cached()
def get_users_in_room(self, room_id):
def f(txn):

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore
from ._base import SQLBaseStore, cached
from twisted.internet import defer
@ -143,6 +143,12 @@ class StateStore(SQLBaseStore):
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
if event_type and state_key is not None:
result = yield self.get_current_state_for_key(
room_id, event_type, state_key
)
defer.returnValue(result)
def f(txn):
sql = (
"SELECT event_id FROM current_state_events"
@ -167,6 +173,23 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
@cached(num_args=3)
@defer.inlineCallbacks
def get_current_state_for_key(self, room_id, event_type, state_key):
def f(txn):
sql = (
"SELECT event_id FROM current_state_events"
" WHERE room_id = ? AND type = ? AND state_key = ?"
)
args = (room_id, event_type, state_key)
txn.execute(sql, args)
results = txn.fetchall()
return [r[0] for r in results]
event_ids = yield self.runInteraction("get_current_state_for_key", f)
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
def _make_group_id(clock):
return str(int(clock.time_msec())) + random_string(5)

View file

@ -20,7 +20,6 @@ import threading
class LruCache(object):
"""Least-recently-used cache."""
# TODO(mjark) Add mutex for linked list for thread safety.
def __init__(self, max_size):
cache = {}
list_root = []
@ -105,6 +104,12 @@ class LruCache(object):
else:
return default
@synchronized
def cache_clear():
list_root[NEXT] = list_root
list_root[PREV] = list_root
cache.clear()
@synchronized
def cache_len():
return len(cache)
@ -120,6 +125,7 @@ class LruCache(object):
self.pop = cache_pop
self.len = cache_len
self.contains = cache_contains
self.clear = cache_clear
def __getitem__(self, key):
result = self.get(key, self.sentinel)