|
|
|
@ -65,7 +65,27 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): |
|
|
|
|
columns=["last_seen"], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen) |
|
|
|
|
self.register_background_update_handler( |
|
|
|
|
"user_ips_remove_dupes", |
|
|
|
|
self._remove_user_ip_dupes, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Register a unique index |
|
|
|
|
self.register_background_index_update( |
|
|
|
|
"user_ips_device_unique_index", |
|
|
|
|
index_name="user_ips_user_token_ip_unique_index", |
|
|
|
|
table="user_ips", |
|
|
|
|
columns=["user_id", "access_token", "ip"], |
|
|
|
|
unique=True, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Drop the old non-unique index |
|
|
|
|
self.register_background_update_handler( |
|
|
|
|
"user_ips_drop_nonunique_index", |
|
|
|
|
self._remove_user_ip_nonunique, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) |
|
|
|
|
self._batch_row_update = {} |
|
|
|
|
|
|
|
|
|
self._client_ip_looper = self._clock.looping_call( |
|
|
|
@ -75,6 +95,116 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): |
|
|
|
|
"before", "shutdown", self._update_client_ips_batch |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _remove_user_ip_nonunique(self, progress, batch_size): |
|
|
|
|
def f(conn): |
|
|
|
|
txn = conn.cursor() |
|
|
|
|
txn.execute( |
|
|
|
|
"DROP INDEX IF EXISTS user_ips_user_ip" |
|
|
|
|
) |
|
|
|
|
txn.close() |
|
|
|
|
|
|
|
|
|
yield self.runWithConnection(f) |
|
|
|
|
yield self._end_background_update("user_ips_drop_nonunique_index") |
|
|
|
|
defer.returnValue(1) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _remove_user_ip_dupes(self, progress, batch_size): |
|
|
|
|
|
|
|
|
|
last_seen_progress = progress.get("last_seen", 0) |
|
|
|
|
|
|
|
|
|
def get_last_seen(txn): |
|
|
|
|
txn.execute( |
|
|
|
|
""" |
|
|
|
|
SELECT last_seen FROM user_ips |
|
|
|
|
WHERE last_seen > ? |
|
|
|
|
ORDER BY last_seen |
|
|
|
|
LIMIT 1 |
|
|
|
|
OFFSET ? |
|
|
|
|
""", |
|
|
|
|
(last_seen_progress, batch_size) |
|
|
|
|
) |
|
|
|
|
results = txn.fetchone() |
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
# Get a last seen that's sufficiently far away enough from the last one |
|
|
|
|
last_seen = yield self.runInteraction( |
|
|
|
|
"user_ips_dups_get_last_seen", get_last_seen |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if not last_seen: |
|
|
|
|
# If we get a None then we're reaching the end and just need to |
|
|
|
|
# delete the last batch. |
|
|
|
|
last = True |
|
|
|
|
|
|
|
|
|
# We fake not having an upper bound by using a future date, by |
|
|
|
|
# just multiplying the current time by two.... |
|
|
|
|
last_seen = int(self.clock.time_msec()) * 2 |
|
|
|
|
else: |
|
|
|
|
last = False |
|
|
|
|
last_seen = last_seen[0] |
|
|
|
|
|
|
|
|
|
def remove(txn, last_seen_progress, last_seen): |
|
|
|
|
# This works by looking at all entries in the given time span, and |
|
|
|
|
# then for each (user_id, access_token, ip) tuple in that range |
|
|
|
|
# checking for any duplicates in the rest of the table (via a join). |
|
|
|
|
# It then only returns entries which have duplicates, and the max |
|
|
|
|
# last_seen across all duplicates, which can the be used to delete |
|
|
|
|
# all other duplicates. |
|
|
|
|
# It is efficient due to the existence of (user_id, access_token, |
|
|
|
|
# ip) and (last_seen) indices. |
|
|
|
|
txn.execute( |
|
|
|
|
""" |
|
|
|
|
SELECT user_id, access_token, ip, |
|
|
|
|
MAX(device_id), MAX(user_agent), MAX(last_seen) |
|
|
|
|
FROM ( |
|
|
|
|
SELECT user_id, access_token, ip |
|
|
|
|
FROM user_ips |
|
|
|
|
WHERE ? <= last_seen AND last_seen < ? |
|
|
|
|
ORDER BY last_seen |
|
|
|
|
) c |
|
|
|
|
INNER JOIN user_ips USING (user_id, access_token, ip) |
|
|
|
|
GROUP BY user_id, access_token, ip |
|
|
|
|
HAVING count(*) > 1""", |
|
|
|
|
(last_seen_progress, last_seen) |
|
|
|
|
) |
|
|
|
|
res = txn.fetchall() |
|
|
|
|
|
|
|
|
|
# We've got some duplicates |
|
|
|
|
for i in res: |
|
|
|
|
user_id, access_token, ip, device_id, user_agent, last_seen = i |
|
|
|
|
|
|
|
|
|
# Drop all the duplicates |
|
|
|
|
txn.execute( |
|
|
|
|
""" |
|
|
|
|
DELETE FROM user_ips |
|
|
|
|
WHERE user_id = ? AND access_token = ? AND ip = ? |
|
|
|
|
""", |
|
|
|
|
(user_id, access_token, ip) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Add in one to be the last_seen |
|
|
|
|
txn.execute( |
|
|
|
|
""" |
|
|
|
|
INSERT INTO user_ips |
|
|
|
|
(user_id, access_token, ip, device_id, user_agent, last_seen) |
|
|
|
|
VALUES (?, ?, ?, ?, ?, ?) |
|
|
|
|
""", |
|
|
|
|
(user_id, access_token, ip, device_id, user_agent, last_seen) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self._background_update_progress_txn( |
|
|
|
|
txn, "user_ips_remove_dupes", {"last_seen": last_seen} |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
yield self.runInteraction( |
|
|
|
|
"user_ips_dups_remove", remove, last_seen_progress, last_seen |
|
|
|
|
) |
|
|
|
|
if last: |
|
|
|
|
yield self._end_background_update("user_ips_remove_dupes") |
|
|
|
|
|
|
|
|
|
defer.returnValue(batch_size) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, |
|
|
|
|
now=None): |
|
|
|
@ -127,10 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): |
|
|
|
|
"user_id": user_id, |
|
|
|
|
"access_token": access_token, |
|
|
|
|
"ip": ip, |
|
|
|
|
"user_agent": user_agent, |
|
|
|
|
"device_id": device_id, |
|
|
|
|
}, |
|
|
|
|
values={ |
|
|
|
|
"user_agent": user_agent, |
|
|
|
|
"device_id": device_id, |
|
|
|
|
"last_seen": last_seen, |
|
|
|
|
}, |
|
|
|
|
lock=False, |
|
|
|
@ -227,7 +357,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): |
|
|
|
|
results = {} |
|
|
|
|
|
|
|
|
|
for key in self._batch_row_update: |
|
|
|
|
uid, access_token, ip = key |
|
|
|
|
uid, access_token, ip, = key |
|
|
|
|
if uid == user_id: |
|
|
|
|
user_agent, _, last_seen = self._batch_row_update[key] |
|
|
|
|
results[(access_token, ip)] = (user_agent, last_seen) |
|
|
|
|