|
|
|
@ -24,6 +24,13 @@ logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeviceStore(SQLBaseStore): |
|
|
|
|
def __init__(self, hs): |
|
|
|
|
super(DeviceStore, self).__init__(hs) |
|
|
|
|
|
|
|
|
|
self._clock.looping_call( |
|
|
|
|
self._prune_old_outbound_device_pokes, 60 * 60 * 1000 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def store_device(self, user_id, device_id, |
|
|
|
|
initial_device_display_name): |
|
|
|
@ -530,3 +537,38 @@ class DeviceStore(SQLBaseStore): |
|
|
|
|
|
|
|
|
|
def get_device_stream_token(self): |
|
|
|
|
return self._device_list_id_gen.get_current_token() |
|
|
|
|
|
|
|
|
|
def _prune_old_outbound_device_pokes(self): |
|
|
|
|
"""Delete old entries out of the device_lists_outbound_pokes to ensure |
|
|
|
|
that we don't fill up due to dead servers. We keep one entry per |
|
|
|
|
(destination, user_id) tuple to ensure that the prev_ids remain correct |
|
|
|
|
if the server does come back. |
|
|
|
|
""" |
|
|
|
|
now = self._clock.time_msec() |
|
|
|
|
|
|
|
|
|
def _prune_txn(txn): |
|
|
|
|
select_sql = """ |
|
|
|
|
SELECT destination, user_id, max(stream_id) as stream_id |
|
|
|
|
FROM device_lists_outbound_pokes |
|
|
|
|
GROUP BY destination, user_id |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
txn.execute(select_sql) |
|
|
|
|
rows = txn.fetchall() |
|
|
|
|
|
|
|
|
|
delete_sql = """ |
|
|
|
|
DELETE FROM device_lists_outbound_pokes |
|
|
|
|
WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
txn.executemany( |
|
|
|
|
delete_sql, |
|
|
|
|
( |
|
|
|
|
(now, row["destination"], row["user_id"], row["stream_id"]) |
|
|
|
|
for row in rows |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return self.runInteraction( |
|
|
|
|
"_prune_old_outbound_device_pokes", _prune_txn |
|
|
|
|
) |
|
|
|
|