synapse/scripts-dev/tail-synapse.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

68 lines
1.7 KiB
Python
Raw Permalink Normal View History

import collections
import json
import sys
import time
import requests
Entry = collections.namedtuple("Entry", "name position rows")
ROW_TYPES = {}
def row_type_for_columns(name, column_names):
column_names = tuple(column_names)
row_type = ROW_TYPES.get((name, column_names))
if row_type is None:
row_type = collections.namedtuple(name, column_names)
ROW_TYPES[(name, column_names)] = row_type
return row_type
def parse_response(content):
streams = json.loads(content)
result = {}
for name, value in streams.items():
row_type = row_type_for_columns(name, value["field_names"])
position = value["position"]
rows = [row_type(*row) for row in value["rows"]]
result[name] = Entry(name, position, rows)
return result
def replicate(server, streams):
return parse_response(
requests.get(
server + "/_synapse/replication", verify=False, params=streams
).content
)
def main():
server = sys.argv[1]
streams = None
while not streams:
try:
streams = {
row.name: row.position
for row in replicate(server, {"streams": "-1"})["streams"].rows
}
except requests.exceptions.ConnectionError:
time.sleep(0.1)
while True:
try:
results = replicate(server, streams)
except Exception:
sys.stdout.write("connection_lost(" + repr(streams) + ")\n")
break
for update in results.values():
for row in update.rows:
sys.stdout.write(repr(row) + "\n")
streams[update.name] = update.position
if __name__ == "__main__":
main()