|
|
@ -291,33 +291,33 @@ class SQLBaseStore(object): |
|
|
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
@defer.inlineCallbacks |
|
|
|
def runInteraction(self, desc, func, *args, **kwargs): |
|
|
|
def runInteraction(self, desc, func, *args, **kwargs): |
|
|
|
"""Wraps the .runInteraction() method on the underlying db_pool.""" |
|
|
|
"""Starts a transaction on the database and runs a given function |
|
|
|
current_context = LoggingContext.current_context() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start_time = time.time() * 1000 |
|
|
|
Arguments: |
|
|
|
|
|
|
|
desc (str): description of the transaction, for logging and metrics |
|
|
|
|
|
|
|
func (func): callback function, which will be called with a |
|
|
|
|
|
|
|
database transaction (twisted.enterprise.adbapi.Transaction) as |
|
|
|
|
|
|
|
its first argument, followed by `args` and `kwargs`. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
args (list): positional args to pass to `func` |
|
|
|
|
|
|
|
kwargs (dict): named args to pass to `func` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
|
|
|
Deferred: The result of func |
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
current_context = LoggingContext.current_context() |
|
|
|
|
|
|
|
|
|
|
|
after_callbacks = [] |
|
|
|
after_callbacks = [] |
|
|
|
final_callbacks = [] |
|
|
|
final_callbacks = [] |
|
|
|
|
|
|
|
|
|
|
|
def inner_func(conn, *args, **kwargs): |
|
|
|
def inner_func(conn, *args, **kwargs): |
|
|
|
with LoggingContext("runInteraction") as context: |
|
|
|
|
|
|
|
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.database_engine.is_connection_closed(conn): |
|
|
|
|
|
|
|
logger.debug("Reconnecting closed database connection") |
|
|
|
|
|
|
|
conn.reconnect() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_context.copy_to(context) |
|
|
|
|
|
|
|
return self._new_transaction( |
|
|
|
return self._new_transaction( |
|
|
|
conn, desc, after_callbacks, final_callbacks, current_context, |
|
|
|
conn, desc, after_callbacks, final_callbacks, current_context, |
|
|
|
func, *args, **kwargs |
|
|
|
func, *args, **kwargs |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
try: |
|
|
|
with PreserveLoggingContext(): |
|
|
|
result = yield self.runWithConnection(inner_func, *args, **kwargs) |
|
|
|
result = yield self._db_pool.runWithConnection( |
|
|
|
|
|
|
|
inner_func, *args, **kwargs |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for after_callback, after_args, after_kwargs in after_callbacks: |
|
|
|
for after_callback, after_args, after_kwargs in after_callbacks: |
|
|
|
after_callback(*after_args, **after_kwargs) |
|
|
|
after_callback(*after_args, **after_kwargs) |
|
|
@ -329,7 +329,18 @@ class SQLBaseStore(object): |
|
|
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
@defer.inlineCallbacks |
|
|
|
def runWithConnection(self, func, *args, **kwargs): |
|
|
|
def runWithConnection(self, func, *args, **kwargs): |
|
|
|
"""Wraps the .runInteraction() method on the underlying db_pool.""" |
|
|
|
"""Wraps the .runWithConnection() method on the underlying db_pool. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Arguments: |
|
|
|
|
|
|
|
func (func): callback function, which will be called with a |
|
|
|
|
|
|
|
database connection (twisted.enterprise.adbapi.Connection) as |
|
|
|
|
|
|
|
its first argument, followed by `args` and `kwargs`. |
|
|
|
|
|
|
|
args (list): positional args to pass to `func` |
|
|
|
|
|
|
|
kwargs (dict): named args to pass to `func` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
|
|
|
Deferred: The result of func |
|
|
|
|
|
|
|
""" |
|
|
|
current_context = LoggingContext.current_context() |
|
|
|
current_context = LoggingContext.current_context() |
|
|
|
|
|
|
|
|
|
|
|
start_time = time.time() * 1000 |
|
|
|
start_time = time.time() * 1000 |
|
|
|