Merge pull request #4954 from matrix-org/rav/refactor_parse_row

Refactors to replication stream row update/parsing
This commit is contained in:
Richard van der Hoff 2019-03-28 18:31:17 +00:00 committed by GitHub
commit d688a51736
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 5 deletions

1
changelog.d/4954.misc Normal file
View file

@ -0,0 +1 @@
Refactor replication row generation/parsing.

View file

@ -105,13 +105,14 @@ class ReplicationClientHandler(object):
def on_rdata(self, stream_name, token, rows): def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token. """Called to handle a batch of replication data with a given stream token.
By default this just pokes the slave store. Can be overriden in subclasses to By default this just pokes the slave store. Can be overridden in subclasses to
handle more. handle more.
Args: Args:
stream_name (str): name of the replication stream for this batch of rows stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects. rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns: Returns:
Deferred|None Deferred|None

View file

@ -605,7 +605,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
inbound_rdata_count.labels(stream_name).inc() inbound_rdata_count.labels(stream_name).inc()
try: try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) row = STREAMS_MAP[stream_name].parse_row(cmd.row)
except Exception: except Exception:
logger.exception( logger.exception(
"[%s] Failed to parse RDATA: %r %r", "[%s] Failed to parse RDATA: %r %r",

View file

@ -112,9 +112,24 @@ class Stream(object):
time it was called up until the point `advance_current_token` was called. time it was called up until the point `advance_current_token` was called.
""" """
NAME = None # The name of the stream NAME = None # The name of the stream
ROW_TYPE = None # The type of the row ROW_TYPE = None # The type of the row. Used by the default impl of parse_row.
_LIMITED = True # Whether the update function takes a limit _LIMITED = True # Whether the update function takes a limit
@classmethod
def parse_row(cls, row):
"""Parse a row received over replication
By default, assumes that the row data is an array object and passes its contents
to the constructor of the ROW_TYPE for this stream.
Args:
row: row data from the incoming RDATA command, after json decoding
Returns:
ROW_TYPE object for this stream
"""
return cls.ROW_TYPE(*row)
def __init__(self, hs): def __init__(self, hs):
# The token from which we last asked for updates # The token from which we last asked for updates
self.last_token = self.current_token() self.last_token = self.current_token()
@ -186,7 +201,7 @@ class Stream(object):
from_token, current_token, from_token, current_token,
) )
updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] updates = [(row[0], row[1:]) for row in rows]
# check we didn't get more rows than the limit. # check we didn't get more rows than the limit.
# doing it like this allows the update_function to be a generator. # doing it like this allows the update_function to be a generator.