forked from MirrorHub/synapse
Merge pull request #3597 from matrix-org/erikj/did_forget
Fix client_reader worker being able to handle /context requests
This commit is contained in:
commit
1e5dbdcbb1
5 changed files with 38 additions and 34 deletions
1
changelog.d/3597.feature
Normal file
1
changelog.d/3597.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add support for client_reader to handle more APIs
|
|
@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite
|
||||||
from synapse.metrics import RegistryProxy
|
from synapse.metrics import RegistryProxy
|
||||||
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
|
||||||
from synapse.replication.slave.storage._base import BaseSlavedStore
|
from synapse.replication.slave.storage._base import BaseSlavedStore
|
||||||
|
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
|
||||||
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
|
||||||
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||||
from synapse.replication.slave.storage.directory import DirectoryStore
|
from synapse.replication.slave.storage.directory import DirectoryStore
|
||||||
|
@ -58,6 +59,7 @@ logger = logging.getLogger("synapse.app.client_reader")
|
||||||
|
|
||||||
|
|
||||||
class ClientReaderSlavedStore(
|
class ClientReaderSlavedStore(
|
||||||
|
SlavedAccountDataStore,
|
||||||
SlavedEventStore,
|
SlavedEventStore,
|
||||||
SlavedKeyStore,
|
SlavedKeyStore,
|
||||||
RoomStore,
|
RoomStore,
|
||||||
|
|
|
@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.storage.presence import UserPresenceState
|
from synapse.storage.presence import UserPresenceState
|
||||||
from synapse.storage.roommember import RoomMemberStore
|
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||||
from synapse.util.manhole import manhole
|
from synapse.util.manhole import manhole
|
||||||
|
@ -81,9 +80,7 @@ class SynchrotronSlavedStore(
|
||||||
RoomStore,
|
RoomStore,
|
||||||
BaseSlavedStore,
|
BaseSlavedStore,
|
||||||
):
|
):
|
||||||
did_forget = (
|
pass
|
||||||
RoomMemberStore.__dict__["did_forget"]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
UPDATE_SYNCING_USERS_MS = 10 * 1000
|
||||||
|
|
|
@ -24,7 +24,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
|
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
|
||||||
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
|
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
|
||||||
from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID
|
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
|
||||||
from synapse.util import stringutils
|
from synapse.util import stringutils
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
||||||
|
@ -418,8 +418,6 @@ class RoomContextHandler(object):
|
||||||
before_limit = math.floor(limit / 2.)
|
before_limit = math.floor(limit / 2.)
|
||||||
after_limit = limit - before_limit
|
after_limit = limit - before_limit
|
||||||
|
|
||||||
now_token = yield self.hs.get_event_sources().get_current_token()
|
|
||||||
|
|
||||||
users = yield self.store.get_users_in_room(room_id)
|
users = yield self.store.get_users_in_room(room_id)
|
||||||
is_peeking = user.to_string() not in users
|
is_peeking = user.to_string() not in users
|
||||||
|
|
||||||
|
@ -462,11 +460,15 @@ class RoomContextHandler(object):
|
||||||
)
|
)
|
||||||
results["state"] = list(state[last_event_id].values())
|
results["state"] = list(state[last_event_id].values())
|
||||||
|
|
||||||
results["start"] = now_token.copy_and_replace(
|
# We use a dummy token here as we only care about the room portion of
|
||||||
|
# the token, which we replace.
|
||||||
|
token = StreamToken.START
|
||||||
|
|
||||||
|
results["start"] = token.copy_and_replace(
|
||||||
"room_key", results["start"]
|
"room_key", results["start"]
|
||||||
).to_string()
|
).to_string()
|
||||||
|
|
||||||
results["end"] = now_token.copy_and_replace(
|
results["end"] = token.copy_and_replace(
|
||||||
"room_key", results["end"]
|
"room_key", results["end"]
|
||||||
).to_string()
|
).to_string()
|
||||||
|
|
||||||
|
|
|
@ -461,6 +461,30 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||||
def _get_joined_hosts_cache(self, room_id):
|
def _get_joined_hosts_cache(self, room_id):
|
||||||
return _JoinedHostsCache(self, room_id)
|
return _JoinedHostsCache(self, room_id)
|
||||||
|
|
||||||
|
@cachedInlineCallbacks(num_args=2)
|
||||||
|
def did_forget(self, user_id, room_id):
|
||||||
|
"""Returns whether user_id has elected to discard history for room_id.
|
||||||
|
|
||||||
|
Returns False if they have since re-joined."""
|
||||||
|
def f(txn):
|
||||||
|
sql = (
|
||||||
|
"SELECT"
|
||||||
|
" COUNT(*)"
|
||||||
|
" FROM"
|
||||||
|
" room_memberships"
|
||||||
|
" WHERE"
|
||||||
|
" user_id = ?"
|
||||||
|
" AND"
|
||||||
|
" room_id = ?"
|
||||||
|
" AND"
|
||||||
|
" forgotten = 0"
|
||||||
|
)
|
||||||
|
txn.execute(sql, (user_id, room_id))
|
||||||
|
rows = txn.fetchall()
|
||||||
|
return rows[0][0]
|
||||||
|
count = yield self.runInteraction("did_forget_membership", f)
|
||||||
|
defer.returnValue(count == 0)
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberStore(RoomMemberWorkerStore):
|
class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
def __init__(self, db_conn, hs):
|
def __init__(self, db_conn, hs):
|
||||||
|
@ -568,32 +592,10 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_id, room_id))
|
txn.execute(sql, (user_id, room_id))
|
||||||
|
|
||||||
txn.call_after(self.did_forget.invalidate, (user_id, room_id))
|
self._invalidate_cache_and_stream(
|
||||||
return self.runInteraction("forget_membership", f)
|
txn, self.did_forget, (user_id, room_id,),
|
||||||
|
|
||||||
@cachedInlineCallbacks(num_args=2)
|
|
||||||
def did_forget(self, user_id, room_id):
|
|
||||||
"""Returns whether user_id has elected to discard history for room_id.
|
|
||||||
|
|
||||||
Returns False if they have since re-joined."""
|
|
||||||
def f(txn):
|
|
||||||
sql = (
|
|
||||||
"SELECT"
|
|
||||||
" COUNT(*)"
|
|
||||||
" FROM"
|
|
||||||
" room_memberships"
|
|
||||||
" WHERE"
|
|
||||||
" user_id = ?"
|
|
||||||
" AND"
|
|
||||||
" room_id = ?"
|
|
||||||
" AND"
|
|
||||||
" forgotten = 0"
|
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_id, room_id))
|
return self.runInteraction("forget_membership", f)
|
||||||
rows = txn.fetchall()
|
|
||||||
return rows[0][0]
|
|
||||||
count = yield self.runInteraction("did_forget_membership", f)
|
|
||||||
defer.returnValue(count == 0)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _background_add_membership_profile(self, progress, batch_size):
|
def _background_add_membership_profile(self, progress, batch_size):
|
||||||
|
|
Loading…
Reference in a new issue