forked from MirrorHub/synapse
Add support for storing rejected events in EventContext and data stores
This commit is contained in:
parent
b390bf39f2
commit
b1b85753d7
6 changed files with 83 additions and 11 deletions
|
@ -20,3 +20,4 @@ class EventContext(object):
|
||||||
self.current_state = current_state
|
self.current_state = current_state
|
||||||
self.auth_events = auth_events
|
self.auth_events = auth_events
|
||||||
self.state_group = None
|
self.state_group = None
|
||||||
|
self.rejected = False
|
||||||
|
|
|
@ -30,6 +30,7 @@ from .transactions import TransactionStore
|
||||||
from .keys import KeyStore
|
from .keys import KeyStore
|
||||||
from .event_federation import EventFederationStore
|
from .event_federation import EventFederationStore
|
||||||
from .media_repository import MediaRepositoryStore
|
from .media_repository import MediaRepositoryStore
|
||||||
|
from .rejections import RejectionsStore
|
||||||
|
|
||||||
from .state import StateStore
|
from .state import StateStore
|
||||||
from .signatures import SignatureStore
|
from .signatures import SignatureStore
|
||||||
|
@ -66,7 +67,7 @@ SCHEMAS = [
|
||||||
|
|
||||||
# Remember to update this number every time an incompatible change is made to
|
# Remember to update this number every time an incompatible change is made to
|
||||||
# database schema files, so the users will be informed on server restarts.
|
# database schema files, so the users will be informed on server restarts.
|
||||||
SCHEMA_VERSION = 11
|
SCHEMA_VERSION = 12
|
||||||
|
|
||||||
|
|
||||||
class _RollbackButIsFineException(Exception):
|
class _RollbackButIsFineException(Exception):
|
||||||
|
@ -82,6 +83,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
DirectoryStore, KeyStore, StateStore, SignatureStore,
|
DirectoryStore, KeyStore, StateStore, SignatureStore,
|
||||||
EventFederationStore,
|
EventFederationStore,
|
||||||
MediaRepositoryStore,
|
MediaRepositoryStore,
|
||||||
|
RejectionsStore,
|
||||||
):
|
):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -224,6 +226,9 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
if not outlier:
|
if not outlier:
|
||||||
self._store_state_groups_txn(txn, event, context)
|
self._store_state_groups_txn(txn, event, context)
|
||||||
|
|
||||||
|
if context.rejected:
|
||||||
|
self._store_rejections_txn(txn, event.event_id, context.rejected)
|
||||||
|
|
||||||
if current_state:
|
if current_state:
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"DELETE FROM current_state_events WHERE room_id = ?",
|
"DELETE FROM current_state_events WHERE room_id = ?",
|
||||||
|
@ -262,7 +267,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
or_replace=True,
|
or_replace=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if is_new_state:
|
if is_new_state and not context.rejected:
|
||||||
self._simple_insert_txn(
|
self._simple_insert_txn(
|
||||||
txn,
|
txn,
|
||||||
"current_state_events",
|
"current_state_events",
|
||||||
|
@ -288,7 +293,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
or_ignore=True,
|
or_ignore=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled and not context.rejected:
|
||||||
self._simple_insert_txn(
|
self._simple_insert_txn(
|
||||||
txn,
|
txn,
|
||||||
table="state_forward_extremities",
|
table="state_forward_extremities",
|
||||||
|
|
|
@ -458,10 +458,12 @@ class SQLBaseStore(object):
|
||||||
return [e for e in events if e]
|
return [e for e in events if e]
|
||||||
|
|
||||||
def _get_event_txn(self, txn, event_id, check_redacted=True,
|
def _get_event_txn(self, txn, event_id, check_redacted=True,
|
||||||
get_prev_content=False):
|
get_prev_content=False, allow_rejected=False):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT internal_metadata, json, r.event_id FROM event_json as e "
|
"SELECT internal_metadata, json, r.event_id, reason "
|
||||||
|
"FROM event_json as e "
|
||||||
"LEFT JOIN redactions as r ON e.event_id = r.redacts "
|
"LEFT JOIN redactions as r ON e.event_id = r.redacts "
|
||||||
|
"LEFT JOIN rejections as rej on rej.event_id = e.event_id "
|
||||||
"WHERE e.event_id = ? "
|
"WHERE e.event_id = ? "
|
||||||
"LIMIT 1 "
|
"LIMIT 1 "
|
||||||
)
|
)
|
||||||
|
@ -473,13 +475,16 @@ class SQLBaseStore(object):
|
||||||
if not res:
|
if not res:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
internal_metadata, js, redacted = res
|
internal_metadata, js, redacted, rejected_reason = res
|
||||||
|
|
||||||
|
if allow_rejected or not rejected_reason:
|
||||||
return self._get_event_from_row_txn(
|
return self._get_event_from_row_txn(
|
||||||
txn, internal_metadata, js, redacted,
|
txn, internal_metadata, js, redacted,
|
||||||
check_redacted=check_redacted,
|
check_redacted=check_redacted,
|
||||||
get_prev_content=get_prev_content,
|
get_prev_content=get_prev_content,
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
|
||||||
check_redacted=True, get_prev_content=False):
|
check_redacted=True, get_prev_content=False):
|
||||||
|
|
33
synapse/storage/rejections.py
Normal file
33
synapse/storage/rejections.py
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2014, 2015 OpenMarket Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RejectionsStore(SQLBaseStore):
|
||||||
|
def _store_rejections_txn(self, txn, event_id, reason):
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="rejections",
|
||||||
|
values={
|
||||||
|
"event_id": event_id,
|
||||||
|
"reason": reason,
|
||||||
|
"last_failure": self._clock.time_msec(),
|
||||||
|
}
|
||||||
|
)
|
21
synapse/storage/schema/delta/v12.sql
Normal file
21
synapse/storage/schema/delta/v12.sql
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
/* Copyright 2015 OpenMarket Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS rejections(
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
reason TEXT NOT NULL,
|
||||||
|
last_check TEXT NOT NULL,
|
||||||
|
CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
|
||||||
|
);
|
|
@ -123,3 +123,10 @@ CREATE TABLE IF NOT EXISTS room_hosts(
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
|
CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS rejections(
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
reason TEXT NOT NULL,
|
||||||
|
last_check TEXT NOT NULL,
|
||||||
|
CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
|
||||||
|
);
|
||||||
|
|
Loading…
Reference in a new issue