forked from MirrorHub/synapse
67 lines
1.7 KiB
Python
67 lines
1.7 KiB
Python
import collections
|
|
import json
|
|
import sys
|
|
import time
|
|
|
|
import requests
|
|
|
|
Entry = collections.namedtuple("Entry", "name position rows")
|
|
|
|
ROW_TYPES = {}
|
|
|
|
|
|
def row_type_for_columns(name, column_names):
|
|
column_names = tuple(column_names)
|
|
row_type = ROW_TYPES.get((name, column_names))
|
|
if row_type is None:
|
|
row_type = collections.namedtuple(name, column_names)
|
|
ROW_TYPES[(name, column_names)] = row_type
|
|
return row_type
|
|
|
|
|
|
def parse_response(content):
|
|
streams = json.loads(content)
|
|
result = {}
|
|
for name, value in streams.items():
|
|
row_type = row_type_for_columns(name, value["field_names"])
|
|
position = value["position"]
|
|
rows = [row_type(*row) for row in value["rows"]]
|
|
result[name] = Entry(name, position, rows)
|
|
return result
|
|
|
|
|
|
def replicate(server, streams):
|
|
return parse_response(
|
|
requests.get(
|
|
server + "/_synapse/replication", verify=False, params=streams
|
|
).content
|
|
)
|
|
|
|
|
|
def main():
|
|
server = sys.argv[1]
|
|
|
|
streams = None
|
|
while not streams:
|
|
try:
|
|
streams = {
|
|
row.name: row.position
|
|
for row in replicate(server, {"streams": "-1"})["streams"].rows
|
|
}
|
|
except requests.exceptions.ConnectionError as e:
|
|
time.sleep(0.1)
|
|
|
|
while True:
|
|
try:
|
|
results = replicate(server, streams)
|
|
except Exception:
|
|
sys.stdout.write("connection_lost(" + repr(streams) + ")\n")
|
|
break
|
|
for update in results.values():
|
|
for row in update.rows:
|
|
sys.stdout.write(repr(row) + "\n")
|
|
streams[update.name] = update.position
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|