|
|
|
@ -26,6 +26,7 @@ from prometheus_client import Histogram |
|
|
|
|
from twisted.internet import defer |
|
|
|
|
|
|
|
|
|
from synapse.api.errors import StoreError |
|
|
|
|
from synapse.metrics.background_process_metrics import run_as_background_process |
|
|
|
|
from synapse.storage.engines import PostgresEngine |
|
|
|
|
from synapse.util.caches.descriptors import Cache |
|
|
|
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext |
|
|
|
@ -198,7 +199,12 @@ class SQLBaseStore(object): |
|
|
|
|
if self.database_engine.can_native_upsert: |
|
|
|
|
# Check ASAP (and then later, every 1s) to see if we have finished |
|
|
|
|
# background updates of tables that aren't safe to update. |
|
|
|
|
self._clock.call_later(0.0, self._check_safe_to_upsert) |
|
|
|
|
self._clock.call_later( |
|
|
|
|
0.0, |
|
|
|
|
run_as_background_process, |
|
|
|
|
"upsert_safety_check", |
|
|
|
|
self._check_safe_to_upsert |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _check_safe_to_upsert(self): |
|
|
|
@ -208,7 +214,7 @@ class SQLBaseStore(object): |
|
|
|
|
If there are background updates, we will need to wait, as they may be |
|
|
|
|
the addition of indexes that set the UNIQUE constraint that we require. |
|
|
|
|
|
|
|
|
|
If the background updates have not completed, wait a second and check again. |
|
|
|
|
If the background updates have not completed, wait 15 sec and check again. |
|
|
|
|
""" |
|
|
|
|
updates = yield self._simple_select_list( |
|
|
|
|
"background_updates", |
|
|
|
@ -221,11 +227,16 @@ class SQLBaseStore(object): |
|
|
|
|
# The User IPs table in schema #53 was missing a unique index, which we |
|
|
|
|
# run as a background update. |
|
|
|
|
if "user_ips_device_unique_index" not in updates: |
|
|
|
|
self._unsafe_to_upsert_tables.discard("user_id") |
|
|
|
|
self._unsafe_to_upsert_tables.discard("user_ips") |
|
|
|
|
|
|
|
|
|
# If there's any tables left to check, reschedule to run. |
|
|
|
|
if self._unsafe_to_upsert_tables: |
|
|
|
|
self._clock.call_later(1.0, self._check_safe_to_upsert) |
|
|
|
|
self._clock.call_later( |
|
|
|
|
15.0, |
|
|
|
|
run_as_background_process, |
|
|
|
|
"upsert_safety_check", |
|
|
|
|
self._check_safe_to_upsert |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def start_profiling(self): |
|
|
|
|
self._previous_loop_ts = self._clock.time_msec() |
|
|
|
@ -609,7 +620,7 @@ class SQLBaseStore(object): |
|
|
|
|
inserting |
|
|
|
|
lock (bool): True to lock the table when doing the upsert. |
|
|
|
|
Returns: |
|
|
|
|
Deferred(None or bool): Native upserts always return None. Emulated |
|
|
|
|
None or bool: Native upserts always return None. Emulated |
|
|
|
|
upserts return True if a new entry was created, False if an existing |
|
|
|
|
one was updated. |
|
|
|
|
""" |
|
|
|
@ -637,6 +648,18 @@ class SQLBaseStore(object): |
|
|
|
|
def _simple_upsert_txn_emulated( |
|
|
|
|
self, txn, table, keyvalues, values, insertion_values={}, lock=True |
|
|
|
|
): |
|
|
|
|
""" |
|
|
|
|
Args: |
|
|
|
|
table (str): The table to upsert into |
|
|
|
|
keyvalues (dict): The unique key tables and their new values |
|
|
|
|
values (dict): The nonunique columns and their new values |
|
|
|
|
insertion_values (dict): additional key/values to use only when |
|
|
|
|
inserting |
|
|
|
|
lock (bool): True to lock the table when doing the upsert. |
|
|
|
|
Returns: |
|
|
|
|
bool: Return True if a new entry was created, False if an existing |
|
|
|
|
one was updated. |
|
|
|
|
""" |
|
|
|
|
# We need to lock the table :(, unless we're *really* careful |
|
|
|
|
if lock: |
|
|
|
|
self.database_engine.lock_table(txn, table) |
|
|
|
|