From 62b51b84520243d25b82a303ff44ff2f329c8d82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Apr 2016 12:00:51 +0100 Subject: [PATCH 01/12] Fix typo in event_auth servlet path --- synapse/federation/transport/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index d65a7893d..3e552b6c4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -323,7 +323,7 @@ class FederationSendLeaveServlet(BaseFederationServlet): class FederationEventAuthServlet(BaseFederationServlet): - PATH = "/event_auth(?P[^/]*)/(?P[^/]*)" + PATH = "/event_auth/(?P[^/]*)/(?P[^/]*)" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) From dc2c527ce9618660f8f2c7e8947b528d06418a28 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Apr 2016 12:07:54 +0100 Subject: [PATCH 02/12] Fix password reset Default requester to None, otherwise it isn't defined when resetting using email auth --- synapse/rest/client/v2_alpha/account.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 7f8a6a4cf..c88c27053 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -52,6 +52,7 @@ class PasswordRestServlet(RestServlet): defer.returnValue((401, result)) user_id = None + requester = None if LoginType.PASSWORD in result: # if using password, they should also be logged in From 984d4a2c0f59039a623b6a6f1945ff697f004c27 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 May 2016 11:28:10 +0100 Subject: [PATCH 03/12] Add /report endpoint --- synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/report_event.py | 59 ++++++++++++++++++++ synapse/storage/prepare_database.py | 2 +- synapse/storage/room.py | 14 +++++ synapse/storage/schema/delta/32/reports.sql | 23 ++++++++ 5 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 synapse/rest/client/v2_alpha/report_event.py create mode 100644 synapse/storage/schema/delta/32/reports.sql diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 6688fa8fa..e805cb911 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -44,6 +44,7 @@ from synapse.rest.client.v2_alpha import ( tokenrefresh, tags, account_data, + report_event, ) from synapse.http.server import JsonResource @@ -86,3 +87,4 @@ class ClientRestResource(JsonResource): tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) account_data.register_servlets(hs, client_resource) + report_event.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py new file mode 100644 index 000000000..412e5b190 --- /dev/null +++ b/synapse/rest/client/v2_alpha/report_event.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from ._base import client_v2_patterns + +import logging + + +logger = logging.getLogger(__name__) + + +class ReportEventRestServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/rooms/(?P[^/]*)/report$" + ) + + def __init__(self, hs): + super(ReportEventRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def on_POST(self, request, room_id): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + body = parse_json_object_from_request(request) + + event_id = body["event_id"] + + yield self.store.add_event_report( + room_id=room_id, + event_id=event_id, + user_id=user_id, + reason=body.get("reason"), + content=body, + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + ReportEventRestServlet(hs).register(http_server) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 57f14fd12..c8487c883 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 31 +SCHEMA_VERSION = 32 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 70aa64fb3..ceced7d51 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine import collections import logging +import ujson as json logger = logging.getLogger(__name__) @@ -221,3 +222,16 @@ class RoomStore(SQLBaseStore): aliases.extend(e.content['aliases']) defer.returnValue((name, aliases)) + + def add_event_report(self, room_id, event_id, user_id, reason, content): + return self._simple_insert( + table="event_reports", + values={ + "room_id": room_id, + "event_id": event_id, + "user_id": user_id, + "reason": reason, + "content": json.dumps(content), + }, + desc="add_event_report" + ) diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql new file mode 100644 index 000000000..06bf0d9b5 --- /dev/null +++ b/synapse/storage/schema/delta/32/reports.sql @@ -0,0 +1,23 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE event_reports( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + reason TEXT, + content TEXT +); From 8a04412fa1e837d733302038c64854ba0766efc0 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 4 May 2016 12:19:04 +0100 Subject: [PATCH 04/12] starting point for doc on how log contexts are supposed to work --- docs/log_contexts.rst | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 docs/log_contexts.rst diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst new file mode 100644 index 000000000..0046e171b --- /dev/null +++ b/docs/log_contexts.rst @@ -0,0 +1,10 @@ +What do I do about "Unexpected logging context" debug log-lines everywhere? + + The logging context lives in thread local storage + Sometimes it gets out of sync with what it should actually be, usually because something scheduled something to run on the reactor without preserving the logging context. + what is the impact of it getting out of sync? and how and when should we preserve log context? + The impact is that some of the CPU and database metrics will be under-reported, and some log lines will be mis-attributed. + It should happen auto-magically in all the APIs that do IO or otherwise defer to the reactor. + Mjark: the other place is if we branch, e.g. using defer.gatherResults + +Unanswered: how and when should we preserve log context? \ No newline at end of file From 5650e38e7de4cf89074ff84f4ecfbfcd81fa810d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 May 2016 13:19:39 +0100 Subject: [PATCH 05/12] Move event_id to path --- synapse/rest/client/v2_alpha/report_event.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py index 412e5b190..9c1c9662c 100644 --- a/synapse/rest/client/v2_alpha/report_event.py +++ b/synapse/rest/client/v2_alpha/report_event.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class ReportEventRestServlet(RestServlet): PATTERNS = client_v2_patterns( - "/rooms/(?P[^/]*)/report$" + "/rooms/(?P[^/]*)/report/(?P[^/]*)$" ) def __init__(self, hs): @@ -36,14 +36,12 @@ class ReportEventRestServlet(RestServlet): self.store = hs.get_datastore() @defer.inlineCallbacks - def on_POST(self, request, room_id): + def on_POST(self, request, room_id, event_id): requester = yield self.auth.get_user_by_req(request) user_id = requester.user.to_string() body = parse_json_object_from_request(request) - event_id = body["event_id"] - yield self.store.add_event_report( room_id=room_id, event_id=event_id, From 8e6a163f2762b3f62ae9b350c5050bc2318ec268 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 May 2016 15:19:12 +0100 Subject: [PATCH 06/12] Add timestamp and auto incrementing ID --- synapse/rest/client/v2_alpha/report_event.py | 2 ++ synapse/storage/__init__.py | 1 + synapse/storage/room.py | 6 +++++- synapse/storage/schema/delta/32/reports.sql | 2 ++ 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py index 9c1c9662c..8903e1240 100644 --- a/synapse/rest/client/v2_alpha/report_event.py +++ b/synapse/rest/client/v2_alpha/report_event.py @@ -33,6 +33,7 @@ class ReportEventRestServlet(RestServlet): super(ReportEventRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() + self.clock = hs.get_clock() self.store = hs.get_datastore() @defer.inlineCallbacks @@ -48,6 +49,7 @@ class ReportEventRestServlet(RestServlet): user_id=user_id, reason=body.get("reason"), content=body, + received_ts=self.clock.time_msec(), ) defer.returnValue((200, {})) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 045ae6c03..7122b0cbb 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -114,6 +114,7 @@ class DataStore(RoomMemberStore, RoomStore, self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") + self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id") self._push_rules_stream_id_gen = ChainedIdGenerator( diff --git a/synapse/storage/room.py b/synapse/storage/room.py index ceced7d51..26933e593 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -223,10 +223,14 @@ class RoomStore(SQLBaseStore): defer.returnValue((name, aliases)) - def add_event_report(self, room_id, event_id, user_id, reason, content): + def add_event_report(self, room_id, event_id, user_id, reason, content, + received_ts): + next_id = self._event_reports_id_gen.get_next() return self._simple_insert( table="event_reports", values={ + "id": next_id, + "received_ts": received_ts, "room_id": room_id, "event_id": event_id, "user_id": user_id, diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql index 06bf0d9b5..3f2502745 100644 --- a/synapse/storage/schema/delta/32/reports.sql +++ b/synapse/storage/schema/delta/32/reports.sql @@ -15,6 +15,8 @@ CREATE TABLE event_reports( + id BIGINT NOT NULL, + received_ts BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, user_id TEXT NOT NULL, From fcd1eb642dc8b73e372b78bb788e3a17a5e40ace Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 May 2016 16:51:51 +0100 Subject: [PATCH 07/12] Add primary key --- synapse/storage/schema/delta/32/reports.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql index 3f2502745..d13609776 100644 --- a/synapse/storage/schema/delta/32/reports.sql +++ b/synapse/storage/schema/delta/32/reports.sql @@ -15,7 +15,7 @@ CREATE TABLE event_reports( - id BIGINT NOT NULL, + id BIGINT NOT NULL PRIMARY KEY, received_ts BIGINT NOT NULL, room_id TEXT NOT NULL, event_id TEXT NOT NULL, From 9c272da05fcf51534aaa877647bc3b82bf841cf3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 5 May 2016 13:42:44 +0100 Subject: [PATCH 08/12] Add an openidish mechanism for proving to third parties that you own a given user_id --- synapse/federation/federation_server.py | 5 ++ synapse/federation/transport/server.py | 47 ++++++++++- synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/openid.py | 96 ++++++++++++++++++++++ synapse/storage/__init__.py | 4 +- synapse/storage/openid.py | 32 ++++++++ synapse/storage/schema/delta/32/openid.sql | 9 ++ 7 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 synapse/rest/client/v2_alpha/openid.py create mode 100644 synapse/storage/openid.py create mode 100644 synapse/storage/schema/delta/32/openid.sql diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 429ab6dde..f1d231b9d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -387,6 +387,11 @@ class FederationServer(FederationBase): "events": [ev.get_pdu_json(time_now) for ev in missing_events], }) + @log_function + def on_openid_userinfo(self, token): + ts_now_ms = self._clock.time_msec() + return self.store.get_user_id_for_open_id_token(token, ts_now_ms) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3e552b6c4..5b6c7d11d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request, parse_string from synapse.util.ratelimitutils import FederationRateLimiter import functools @@ -448,6 +448,50 @@ class On3pidBindServlet(BaseFederationServlet): return code +class OpenIdUserInfo(BaseFederationServlet): + """ + Exchange a bearer token for information about a user. + + The response format should be compatible with: + http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse + + GET /openid/userinfo?access_token=ABDEFGH HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "sub": "@userpart:example.org", + } + """ + + PATH = "/openid/userinfo" + + @defer.inlineCallbacks + def on_GET(self, request): + token = parse_string(request, "access_token") + if token is None: + defer.returnValue((401, { + "errcode": "M_MISSING_TOKEN", "error": "Access Token required" + })) + return + + user_id = yield self.handler.on_openid_userinfo(token) + + if user_id is None: + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) + + defer.returnValue((200, {"sub": user_id})) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -468,6 +512,7 @@ SERVLET_CLASSES = ( FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, + OpenIdUserInfo, ) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index e805cb911..8b223e032 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -45,6 +45,7 @@ from synapse.rest.client.v2_alpha import ( tags, account_data, report_event, + openid, ) from synapse.http.server import JsonResource @@ -88,3 +89,4 @@ class ClientRestResource(JsonResource): tags.register_servlets(hs, client_resource) account_data.register_servlets(hs, client_resource) report_event.register_servlets(hs, client_resource) + openid.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py new file mode 100644 index 000000000..ddea75032 --- /dev/null +++ b/synapse/rest/client/v2_alpha/openid.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._base import client_v2_patterns + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.api.errors import AuthError +from synapse.util.stringutils import random_string + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class IdTokenServlet(RestServlet): + """ + Get a bearer token that may be passed to a third party to confirm ownership + of a matrix user id. + + The format of the response could be made compatible with the format given + in http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse + + But instead of returning a signed "id_token" the response contains the + name of the issuing matrix homeserver. This means that for now the third + party will need to check the validity of the "id_token" against the + federation /openid/userinfo endpoint of the homeserver. + + Request: + + POST /user/{user_id}/openid/token?access_token=... HTTP/1.1 + + {} + + Response: + + HTTP/1.1 200 OK + { + "access_token": "ABDEFGH", + "token_type": "Bearer", + "matrix_server_name": "example.com", + "expires_in": 3600, + } + """ + PATTERNS = client_v2_patterns( + "/user/(?P[^/]*)/openid/token" + ) + + EXPIRES_MS = 3600 * 1000 + + def __init__(self, hs): + super(IdTokenServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.server_name = hs.config.server_name + + @defer.inlineCallbacks + def on_POST(self, request, user_id): + requester = yield self.auth.get_user_by_req(request) + if user_id != requester.user.to_string(): + raise AuthError(403, "Cannot request tokens for other users.") + + # Parse the request body to make sure it's JSON, but ignore the contents + # for now. + parse_json_object_from_request(request) + + token = random_string(24) + ts_valid_until_ms = self.clock.time_msec() + self.EXPIRES_MS + + yield self.store.insert_open_id_token(token, ts_valid_until_ms, user_id) + + defer.returnValue((200, { + "access_token": token, + "token_type": "Bearer", + "matrix_server_name": self.server_name, + "expires_in": self.EXPIRES_MS / 1000, + })) + + +def register_servlets(hs, http_server): + IdTokenServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7122b0cbb..d970fde9e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,7 @@ from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from .openid import OpenIdStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore + EventPushActionsStore, + OpenIdStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py new file mode 100644 index 000000000..5dabb607b --- /dev/null +++ b/synapse/storage/openid.py @@ -0,0 +1,32 @@ +from ._base import SQLBaseStore + + +class OpenIdStore(SQLBaseStore): + def insert_open_id_token(self, token, ts_valid_until_ms, user_id): + return self._simple_insert( + table="open_id_tokens", + values={ + "token": token, + "ts_valid_until_ms": ts_valid_until_ms, + "user_id": user_id, + }, + desc="insert_open_id_token" + ) + + def get_user_id_for_open_id_token(self, token, ts_now_ms): + def get_user_id_for_token_txn(txn): + sql = ( + "SELECT user_id FROM open_id_tokens" + " WHERE token = ? AND ? <= ts_valid_until_ms" + ) + + txn.execute(sql, (token, ts_now_ms)) + + rows = txn.fetchall() + if not rows: + return None + else: + return rows[0][0] + return self.runInteraction( + "get_user_id_for_token", get_user_id_for_token_txn + ) diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql new file mode 100644 index 000000000..36f37b11c --- /dev/null +++ b/synapse/storage/schema/delta/32/openid.sql @@ -0,0 +1,9 @@ + +CREATE TABLE open_id_tokens ( + token TEXT NOT NULL PRIMARY KEY, + ts_valid_until_ms bigint NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (token) +); + +CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); From 573ef3f1c953542693a1784311154d3345caf5c1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 5 May 2016 15:15:00 +0100 Subject: [PATCH 09/12] Rename openid/token to openid/request_token --- synapse/rest/client/v2_alpha/openid.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py index ddea75032..aa1cae8e1 100644 --- a/synapse/rest/client/v2_alpha/openid.py +++ b/synapse/rest/client/v2_alpha/openid.py @@ -42,7 +42,7 @@ class IdTokenServlet(RestServlet): Request: - POST /user/{user_id}/openid/token?access_token=... HTTP/1.1 + POST /user/{user_id}/openid/request_token?access_token=... HTTP/1.1 {} @@ -57,7 +57,7 @@ class IdTokenServlet(RestServlet): } """ PATTERNS = client_v2_patterns( - "/user/(?P[^/]*)/openid/token" + "/user/(?P[^/]*)/openid/request_token" ) EXPIRES_MS = 3600 * 1000 From 56b5e83e36f22f3eab1ebf7a46b9f23f0c1a3e8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:20:18 +0100 Subject: [PATCH 10/12] Reduce database inserts when sending transactions --- synapse/handlers/presence.py | 2 +- synapse/storage/transactions.py | 157 +++++++++++++++++++++++--------- 2 files changed, 114 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c8f1328..639567953 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 0 * 1000, + 30 * 1000, self.clock.looping_call, self._handle_timeouts, 5000, diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d338dfcf0..17fc60198 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,16 +16,54 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +146,28 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - next_id = self._transaction_id_gen.get_next() + # TODO: Fetch prev_txns + return self.runInteraction( + "prep_send_transaction", + self._get_prevs_txn, + destination, + ) + + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +182,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +193,21 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +331,46 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!") From 1d275dba69272f6df5a79871dc4b8d31e71514d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:25:58 +0100 Subject: [PATCH 11/12] Don't needlessly enter transaction --- synapse/storage/transactions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 17fc60198..ba1969b24 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -159,10 +159,8 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - # TODO: Fetch prev_txns - return self.runInteraction( - "prep_send_transaction", + "_get_prevs_txn", self._get_prevs_txn, destination, ) @@ -350,11 +348,12 @@ class TransactionStore(SQLBaseStore): ] def f(txn): - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) for dest, txn_map in update_delivered.items(): for txn_id, update_row in txn_map.items(): @@ -371,6 +370,7 @@ class TransactionStore(SQLBaseStore): } ) - yield self.runInteraction("_persist_in_mem_txns", f) + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") From d13459636fab9637a43c950d17f883ca77b832f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:30:55 +0100 Subject: [PATCH 12/12] Pull prev txn from in memory --- synapse/storage/transactions.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ba1969b24..6c7481a72 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -58,6 +58,8 @@ class TransactionStore(SQLBaseStore): # Newly delivered transactions that *were* persisted while in flight self.update_delivered_transactions = {} + self.last_transaction = {} + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) hs.get_clock().looping_call( self._persist_in_mem_txns, @@ -159,11 +161,15 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. @@ -196,6 +202,8 @@ class TransactionStore(SQLBaseStore): destination, {} ).pop(transaction_id, None) + self.last_transaction[destination] = transaction_id + if txn_row: d = self.new_delivered_transactions.setdefault(destination, {}) d[transaction_id] = txn_row._replace(