forked from MirrorHub/synapse
Merge pull request #6186 from matrix-org/erikj/disable_sql_bytes
Disable bytes usage with postgres
This commit is contained in:
commit
329eae9cda
8 changed files with 65 additions and 15 deletions
1
changelog.d/6186.bugfix
Normal file
1
changelog.d/6186.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix bug where we were updating censored events as bytes rather than text, occaisonally causing invalid JSON being inserted breaking APIs that attempted to fetch such events.
|
|
@ -270,7 +270,7 @@ class PreviewUrlResource(DirectServeResource):
|
||||||
|
|
||||||
logger.debug("Calculated OG for %s as %s" % (url, og))
|
logger.debug("Calculated OG for %s as %s" % (url, og))
|
||||||
|
|
||||||
jsonog = json.dumps(og).encode("utf8")
|
jsonog = json.dumps(og)
|
||||||
|
|
||||||
# store OG in history-aware DB cache
|
# store OG in history-aware DB cache
|
||||||
yield self.store.store_url_cache(
|
yield self.store.store_url_cache(
|
||||||
|
@ -283,7 +283,7 @@ class PreviewUrlResource(DirectServeResource):
|
||||||
media_info["created_ts"],
|
media_info["created_ts"],
|
||||||
)
|
)
|
||||||
|
|
||||||
return jsonog
|
return jsonog.encode("utf8")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _download_url(self, url, user):
|
def _download_url(self, url, user):
|
||||||
|
|
|
@ -22,6 +22,13 @@ class PostgresEngine(object):
|
||||||
def __init__(self, database_module, database_config):
|
def __init__(self, database_module, database_config):
|
||||||
self.module = database_module
|
self.module = database_module
|
||||||
self.module.extensions.register_type(self.module.extensions.UNICODE)
|
self.module.extensions.register_type(self.module.extensions.UNICODE)
|
||||||
|
|
||||||
|
# Disables passing `bytes` to txn.execute, c.f. #6186. If you do
|
||||||
|
# actually want to use bytes than wrap it in `bytearray`.
|
||||||
|
def _disable_bytes_adapter(_):
|
||||||
|
raise Exception("Passing bytes to DB is disabled.")
|
||||||
|
|
||||||
|
self.module.extensions.register_adapter(bytes, _disable_bytes_adapter)
|
||||||
self.synchronous_commit = database_config.get("synchronous_commit", True)
|
self.synchronous_commit = database_config.get("synchronous_commit", True)
|
||||||
self._version = None # unknown as yet
|
self._version = None # unknown as yet
|
||||||
|
|
||||||
|
|
|
@ -71,6 +71,19 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
||||||
"redactions_received_ts", self._redactions_received_ts
|
"redactions_received_ts", self._redactions_received_ts
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# This index gets deleted in `event_fix_redactions_bytes` update
|
||||||
|
self.register_background_index_update(
|
||||||
|
"event_fix_redactions_bytes_create_index",
|
||||||
|
index_name="redactions_censored_redacts",
|
||||||
|
table="redactions",
|
||||||
|
columns=["redacts"],
|
||||||
|
where_clause="have_censored",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.register_background_update_handler(
|
||||||
|
"event_fix_redactions_bytes", self._event_fix_redactions_bytes
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _background_reindex_fields_sender(self, progress, batch_size):
|
def _background_reindex_fields_sender(self, progress, batch_size):
|
||||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||||
|
@ -458,3 +471,33 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
||||||
yield self._end_background_update("redactions_received_ts")
|
yield self._end_background_update("redactions_received_ts")
|
||||||
|
|
||||||
return count
|
return count
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _event_fix_redactions_bytes(self, progress, batch_size):
|
||||||
|
"""Undoes hex encoded censored redacted event JSON.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _event_fix_redactions_bytes_txn(txn):
|
||||||
|
# This update is quite fast due to new index.
|
||||||
|
txn.execute(
|
||||||
|
"""
|
||||||
|
UPDATE event_json
|
||||||
|
SET
|
||||||
|
json = convert_from(json::bytea, 'utf8')
|
||||||
|
FROM redactions
|
||||||
|
WHERE
|
||||||
|
redactions.have_censored
|
||||||
|
AND event_json.event_id = redactions.redacts
|
||||||
|
AND json NOT LIKE '{%';
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
txn.execute("DROP INDEX redactions_censored_redacts")
|
||||||
|
|
||||||
|
yield self.runInteraction(
|
||||||
|
"_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self._end_background_update("event_fix_redactions_bytes")
|
||||||
|
|
||||||
|
return 1
|
||||||
|
|
|
@ -51,7 +51,7 @@ class FilteringStore(SQLBaseStore):
|
||||||
"SELECT filter_id FROM user_filters "
|
"SELECT filter_id FROM user_filters "
|
||||||
"WHERE user_id = ? AND filter_json = ?"
|
"WHERE user_id = ? AND filter_json = ?"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_localpart, def_json))
|
txn.execute(sql, (user_localpart, bytearray(def_json)))
|
||||||
filter_id_response = txn.fetchone()
|
filter_id_response = txn.fetchone()
|
||||||
if filter_id_response is not None:
|
if filter_id_response is not None:
|
||||||
return filter_id_response[0]
|
return filter_id_response[0]
|
||||||
|
@ -68,7 +68,7 @@ class FilteringStore(SQLBaseStore):
|
||||||
"INSERT INTO user_filters (user_id, filter_id, filter_json)"
|
"INSERT INTO user_filters (user_id, filter_id, filter_json)"
|
||||||
"VALUES(?, ?, ?)"
|
"VALUES(?, ?, ?)"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (user_localpart, filter_id, def_json))
|
txn.execute(sql, (user_localpart, filter_id, bytearray(def_json)))
|
||||||
|
|
||||||
return filter_id
|
return filter_id
|
||||||
|
|
||||||
|
|
|
@ -241,7 +241,7 @@ class PusherStore(PusherWorkerStore):
|
||||||
"device_display_name": device_display_name,
|
"device_display_name": device_display_name,
|
||||||
"ts": pushkey_ts,
|
"ts": pushkey_ts,
|
||||||
"lang": lang,
|
"lang": lang,
|
||||||
"data": encode_canonical_json(data),
|
"data": bytearray(encode_canonical_json(data)),
|
||||||
"last_stream_ordering": last_stream_ordering,
|
"last_stream_ordering": last_stream_ordering,
|
||||||
"profile_tag": profile_tag,
|
"profile_tag": profile_tag,
|
||||||
"id": stream_id,
|
"id": stream_id,
|
||||||
|
|
|
@ -15,12 +15,11 @@
|
||||||
|
|
||||||
|
|
||||||
-- There was a bug where we may have updated censored redactions as bytes,
|
-- There was a bug where we may have updated censored redactions as bytes,
|
||||||
-- which can (somehow) cause json to be inserted hex encoded. This goes and
|
-- which can (somehow) cause json to be inserted hex encoded. These updates go
|
||||||
-- undoes any such hex encoded JSON.
|
-- and undoes any such hex encoded JSON.
|
||||||
UPDATE event_json SET json = convert_from(json::bytea, 'utf8')
|
|
||||||
WHERE event_id IN (
|
INSERT into background_updates (update_name, progress_json)
|
||||||
SELECT event_json.event_id
|
VALUES ('event_fix_redactions_bytes_create_index', '{}');
|
||||||
FROM event_json
|
|
||||||
INNER JOIN redactions ON (event_json.event_id = redacts)
|
INSERT into background_updates (update_name, progress_json, depends_on)
|
||||||
WHERE have_censored AND json NOT LIKE '{%'
|
VALUES ('event_fix_redactions_bytes', '{}', 'event_fix_redactions_bytes_create_index');
|
||||||
);
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
|
||||||
"(event_id, algorithm, hash) "
|
"(event_id, algorithm, hash) "
|
||||||
"VALUES (?, 'sha256', ?)"
|
"VALUES (?, 'sha256', ?)"
|
||||||
),
|
),
|
||||||
(event_id, b"ffff"),
|
(event_id, bytearray(b"ffff")),
|
||||||
)
|
)
|
||||||
|
|
||||||
for i in range(0, 11):
|
for i in range(0, 11):
|
||||||
|
|
Loading…
Reference in a new issue