|
|
|
@ -208,11 +208,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): |
|
|
|
|
class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore): |
|
|
|
|
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" |
|
|
|
|
|
|
|
|
|
def __init__(self, db_conn, hs): |
|
|
|
|
super(DeviceInboxStore, self).__init__(db_conn, hs) |
|
|
|
|
super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs) |
|
|
|
|
|
|
|
|
|
self.register_background_index_update( |
|
|
|
|
"device_inbox_stream_index", |
|
|
|
@ -225,6 +225,26 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): |
|
|
|
|
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _background_drop_index_device_inbox(self, progress, batch_size): |
|
|
|
|
def reindex_txn(conn): |
|
|
|
|
txn = conn.cursor() |
|
|
|
|
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") |
|
|
|
|
txn.close() |
|
|
|
|
|
|
|
|
|
yield self.runWithConnection(reindex_txn) |
|
|
|
|
|
|
|
|
|
yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) |
|
|
|
|
|
|
|
|
|
return 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): |
|
|
|
|
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" |
|
|
|
|
|
|
|
|
|
def __init__(self, db_conn, hs): |
|
|
|
|
super(DeviceInboxStore, self).__init__(db_conn, hs) |
|
|
|
|
|
|
|
|
|
# Map of (user_id, device_id) to the last stream_id that has been |
|
|
|
|
# deleted up to. This is so that we can no op deletions. |
|
|
|
|
self._last_device_delete_cache = ExpiringCache( |
|
|
|
@ -435,16 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): |
|
|
|
|
return self.runInteraction( |
|
|
|
|
"get_all_new_device_messages", get_all_new_device_messages_txn |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _background_drop_index_device_inbox(self, progress, batch_size): |
|
|
|
|
def reindex_txn(conn): |
|
|
|
|
txn = conn.cursor() |
|
|
|
|
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") |
|
|
|
|
txn.close() |
|
|
|
|
|
|
|
|
|
yield self.runWithConnection(reindex_txn) |
|
|
|
|
|
|
|
|
|
yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) |
|
|
|
|
|
|
|
|
|
return 1 |
|
|
|
|