Add support for using executemany

This commit is contained in:
Erik Johnston 2015-05-05 15:13:25 +01:00
parent 1692dc019d
commit 43c2e8deae
5 changed files with 99 additions and 61 deletions

View file

@ -160,18 +160,23 @@ class LoggingTransaction(object):
def __setattr__(self, name, value): def __setattr__(self, name, value):
setattr(self.txn, name, value) setattr(self.txn, name, value)
def execute(self, sql, *args, **kwargs): def execute(self, sql, *args):
self._do_execute(self.txn.execute, sql, *args)
def executemany(self, sql, *args):
self._do_execute(self.txn.executemany, sql, *args)
def _do_execute(self, func, sql, *args):
# TODO(paul): Maybe use 'info' and 'debug' for values? # TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql) sql_logger.debug("[SQL] {%s} %s", self.name, sql)
sql = self.database_engine.convert_param_style(sql) sql = self.database_engine.convert_param_style(sql)
if args and args[0]: if args:
try: try:
sql_logger.debug( sql_logger.debug(
"[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])), "[SQL values] {%s} %r",
self.name, self.name, args[0]
*args[0]
) )
except: except:
# Don't let logging failures stop SQL from working # Don't let logging failures stop SQL from working
@ -180,8 +185,8 @@ class LoggingTransaction(object):
start = time.time() * 1000 start = time.time() * 1000
try: try:
return self.txn.execute( return func(
sql, *args, **kwargs sql, *args
) )
except Exception as e: except Exception as e:
logger.debug("[SQL FAIL] {%s} %s", self.name, e) logger.debug("[SQL FAIL] {%s} %s", self.name, e)
@ -434,18 +439,41 @@ class SQLBaseStore(object):
@log_function @log_function
def _simple_insert_txn(self, txn, table, values): def _simple_insert_txn(self, txn, table, values):
keys, vals = zip(*values.items())
sql = "INSERT INTO %s (%s) VALUES(%s)" % ( sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table, table,
", ".join(k for k in values), ", ".join(k for k in keys),
", ".join("?" for k in values) ", ".join("?" for _ in keys)
) )
logger.debug( txn.execute(sql, vals)
"[SQL] %s Args=%s",
sql, values.values(), def _simple_insert_many_txn(self, txn, table, values):
if not values:
return
keys, vals = zip(*[
zip(
*(sorted(i.items(), key=lambda kv: kv[0]))
)
for i in values
if i
])
for k in keys:
if k != keys[0]:
raise RuntimeError(
"All items must have the same keys"
)
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in keys[0]),
", ".join("?" for _ in keys[0])
) )
txn.execute(sql, values.values()) txn.executemany(sql, vals)
def _simple_upsert(self, table, keyvalues, values, def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True): insertion_values={}, desc="_simple_upsert", lock=True):

View file

@ -262,18 +262,19 @@ class EventFederationStore(SQLBaseStore):
For the given event, update the event edges table and forward and For the given event, update the event edges table and forward and
backward extremities tables. backward extremities tables.
""" """
for e_id, _ in prev_events: self._simple_insert_many_txn(
# TODO (erikj): This could be done as a bulk insert txn,
self._simple_insert_txn(
txn,
table="event_edges", table="event_edges",
values={ values=[
"event_id": event_id, {
"prev_event_id": e_id, "event_id": event_id,
"room_id": room_id, "prev_event_id": e_id,
"is_state": False, "room_id": room_id,
}, "is_state": False,
) }
for e_id, _ in prev_events
],
)
# Update the extremities table if this is not an outlier. # Update the extremities table if this is not an outlier.
if not outlier: if not outlier:
@ -307,16 +308,17 @@ class EventFederationStore(SQLBaseStore):
# Insert all the prev_events as a backwards thing, they'll get # Insert all the prev_events as a backwards thing, they'll get
# deleted in a second if they're incorrect anyway. # deleted in a second if they're incorrect anyway.
for e_id, _ in prev_events: self._simple_insert_many_txn(
# TODO (erikj): This could be done as a bulk insert txn,
self._simple_insert_txn( table="event_backward_extremities",
txn, values=[
table="event_backward_extremities", {
values={
"event_id": e_id, "event_id": e_id,
"room_id": room_id, "room_id": room_id,
}, }
) for e_id, _ in prev_events
],
)
# Also delete from the backwards extremities table all ones that # Also delete from the backwards extremities table all ones that
# reference events that we have already seen # reference events that we have already seen

View file

@ -113,17 +113,19 @@ class EventsStore(SQLBaseStore):
keyvalues={"room_id": event.room_id}, keyvalues={"room_id": event.room_id},
) )
for s in current_state: self._simple_insert_many_txn(
self._simple_insert_txn( txn,
txn, "current_state_events",
"current_state_events", [
{ {
"event_id": s.event_id, "event_id": s.event_id,
"room_id": s.room_id, "room_id": s.room_id,
"type": s.type, "type": s.type,
"state_key": s.state_key, "state_key": s.state_key,
}, }
) for s in current_state
],
)
if event.is_state() and is_new_state: if event.is_state() and is_new_state:
if not backfilled and not context.rejected: if not backfilled and not context.rejected:
@ -296,16 +298,18 @@ class EventsStore(SQLBaseStore):
txn, event.event_id, prev_event_id, alg, hash_bytes txn, event.event_id, prev_event_id, alg, hash_bytes
) )
for auth_id, _ in event.auth_events: self._simple_insert_many_txn(
self._simple_insert_txn( txn,
txn, table="event_auth",
table="event_auth", values=[
values={ {
"event_id": event.event_id, "event_id": event.event_id,
"room_id": event.room_id, "room_id": event.room_id,
"auth_id": auth_id, "auth_id": auth_id,
}, }
) for auth_id, _ in event.auth_events
],
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn( self._store_event_reference_hash_txn(
@ -330,17 +334,19 @@ class EventsStore(SQLBaseStore):
vals, vals,
) )
for e_id, h in event.prev_state: self._simple_insert_many_txn(
self._simple_insert_txn( txn,
txn, table="event_edges",
table="event_edges", values=[
values={ {
"event_id": event.event_id, "event_id": event.event_id,
"prev_event_id": e_id, "prev_event_id": e_id,
"room_id": event.room_id, "room_id": event.room_id,
"is_state": True, "is_state": True,
}, }
) for e_id, h in event.prev_state
],
)
if is_new_state and not context.rejected: if is_new_state and not context.rejected:
self._simple_upsert_txn( self._simple_upsert_txn(

View file

@ -104,18 +104,20 @@ class StateStore(SQLBaseStore):
}, },
) )
for state in state_events.values(): self._simple_insert_many_txn(
self._simple_insert_txn( txn,
txn, table="state_groups_state",
table="state_groups_state", values=[
values={ {
"state_group": state_group, "state_group": state_group,
"room_id": state.room_id, "room_id": state.room_id,
"type": state.type, "type": state.type,
"state_key": state.state_key, "state_key": state.state_key,
"event_id": state.event_id, "event_id": state.event_id,
}, }
) for state in state_events.values()
],
)
self._simple_insert_txn( self._simple_insert_txn(
txn, txn,

View file

@ -67,7 +67,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.mock_txn.execute.assert_called_with( self.mock_txn.execute.assert_called_with(
"INSERT INTO tablename (columname) VALUES(?)", "INSERT INTO tablename (columname) VALUES(?)",
["Value"] ("Value",)
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -82,7 +82,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.mock_txn.execute.assert_called_with( self.mock_txn.execute.assert_called_with(
"INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)", "INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)",
[1, 2, 3] (1, 2, 3,)
) )
@defer.inlineCallbacks @defer.inlineCallbacks