From 4e49f52375acb705b32937115210d055b0e1ce38 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Apr 2015 17:36:37 +0100 Subject: [PATCH] Don't port over all of the sent_transactions table --- scripts/port_from_sqlite_to_postgres.py | 162 ++++++++++++++++++------ 1 file changed, 122 insertions(+), 40 deletions(-) diff --git a/scripts/port_from_sqlite_to_postgres.py b/scripts/port_from_sqlite_to_postgres.py index f4b6ed068..19e35bf80 100644 --- a/scripts/port_from_sqlite_to_postgres.py +++ b/scripts/port_from_sqlite_to_postgres.py @@ -122,8 +122,8 @@ class Store(object): return self.db_pool.runWithConnection(r) - def execute(self, f): - return self.runInteraction(f.__name__, f) + def execute(self, f, *args, **kwargs): + return self.runInteraction(f.__name__, f, *args, **kwargs) def insert_many_txn(self, txn, table, headers, rows): sql = "INSERT INTO %s (%s) VALUES (%s)" % ( @@ -347,9 +347,118 @@ class Porter(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) + def convert_rows(self, table, headers, rows): + bool_col_names = BOOLEAN_COLUMNS.get(table, []) + + bool_cols = [ + i for i, h in enumerate(headers) if h in bool_col_names + ] + + def conv(j, col): + if j in bool_cols: + return bool(col) + return col + + for i, row in enumerate(rows): + rows[i] = tuple( + self.postgres_store.database_engine.encode_parameter( + conv(j, col) + ) + for j, col in enumerate(row) + if j > 0 + ) + @defer.inlineCallbacks def handle_table(self, table): - if table in APPEND_ONLY_TABLES: + def delete_all(txn): + txn.execute( + "DELETE FROM port_from_sqlite3 WHERE table_name = %s", + (table,) + ) + txn.execute("TRUNCATE %s CASCADE" % (table,)) + + def get_table_size(txn): + txn.execute("SELECT count(*) FROM %s" % (table,)) + size, = txn.fetchone() + return int(size) + + if table == "sent_transactions": + # This is a big table, and we really only need some of the recent + # data + yield self.postgres_store.execute(delete_all) + + # Only save things from the last day + yesterday = 1429114568820 #int(time.time()*1000) - 86400000 + + # And save the max transaction id from each destination + select = ( + "SELECT rowid, * FROM sent_transactions WHERE rowid IN (" + "SELECT max(rowid) FROM sent_transactions" + " GROUP BY destination" + ")" + ) + + def r(txn): + txn.execute(select) + rows = txn.fetchall() + headers = [column[0] for column in txn.description] + + ts_ind = headers.index('ts') + + return headers, [r for r in rows if r[ts_ind] < yesterday] + + headers, rows = yield self.sqlite_store.runInteraction( + "select", r, + ) + + self.convert_rows(table, headers, rows) + + inserted_rows = len(rows) + max_inserted_rowid = max(r[0] for r in rows) + + def insert(txn): + self.postgres_store.insert_many_txn( + txn, table, headers[1:], rows + ) + + yield self.postgres_store.execute(insert) + + def get_start_id(txn): + txn.execute( + "SELECT rowid FROM sent_transactions WHERE ts >= ?" + " ORDER BY rowid ASC LIMIT 1", + (yesterday,) + ) + + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + return 1 + + next_chunk = yield self.sqlite_store.execute(get_start_id) + next_chunk = max(max_inserted_rowid + 1, next_chunk) + + yield self.postgres_store._simple_insert( + table="port_from_sqlite3", + values={"table_name": table, "rowid": next_chunk} + ) + + def get_sent_table_size(txn): + txn.execute( + "SELECT count(*) FROM sent_transactions" + " WHERE ts >= ?", + (yesterday,) + ) + size, = txn.fetchone() + return int(size) + + table_size = yield self.sqlite_store.execute( + get_sent_table_size + ) + + table_size += inserted_rows + elif table in APPEND_ONLY_TABLES: # It's safe to just carry on inserting. next_chunk = yield self.postgres_store._simple_select_one_onecol( table="port_from_sqlite3", @@ -365,28 +474,18 @@ class Porter(object): ) next_chunk = 1 - else: - def delete_all(txn): - txn.execute( - "DELETE FROM port_from_sqlite3 WHERE table_name = %s", - (table,) - ) - txn.execute("TRUNCATE %s CASCADE" % (table,)) - self.postgres_store._simple_insert_txn( - txn, - table="port_from_sqlite3", - values={"table_name": table, "rowid": 0} - ) - yield self.postgres_store.execute(delete_all) + table_size = yield self.sqlite_store.execute(get_table_size) + else: + yield self.postgres_store.execute(delete_all) + self.postgres_store._simple_insert( + table="port_from_sqlite3", + values={"table_name": table, "rowid": 0} + ) + + table_size = yield self.sqlite_store.execute(get_table_size) next_chunk = 1 - def get_table_size(txn): - txn.execute("SELECT count(*) FROM %s" % (table,)) - size, = txn.fetchone() - return int(size) - - table_size = yield self.sqlite_store.execute(get_table_size) postgres_size = yield self.postgres_store.execute(get_table_size) if not table_size: @@ -399,8 +498,6 @@ class Porter(object): % (table,) ) - bool_col_names = BOOLEAN_COLUMNS.get(table, []) - while True: def r(txn): txn.execute(select, (next_chunk, self.batch_size,)) @@ -412,24 +509,9 @@ class Porter(object): headers, rows = yield self.sqlite_store.runInteraction("select", r) if rows: - bool_cols = [ - i for i, h in enumerate(headers) if h in bool_col_names - ] next_chunk = rows[-1][0] + 1 - def conv(j, col): - if j in bool_cols: - return bool(col) - return col - - for i, row in enumerate(rows): - rows[i] = tuple( - self.postgres_store.database_engine.encode_parameter( - conv(j, col) - ) - for j, col in enumerate(row) - if j > 0 - ) + self.convert_rows(table, headers, rows) def insert(txn): self.postgres_store.insert_many_txn(