|
|
@ -98,7 +98,7 @@ class FederationRemoteSendQueue(object): |
|
|
|
now = self.clock.time_msec() |
|
|
|
now = self.clock.time_msec() |
|
|
|
|
|
|
|
|
|
|
|
keys = self.pos_time.keys() |
|
|
|
keys = self.pos_time.keys() |
|
|
|
time = keys.bisect_left(now - FIVE_MINUTES_AGO) |
|
|
|
time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO) |
|
|
|
if not keys[:time]: |
|
|
|
if not keys[:time]: |
|
|
|
return |
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
@ -113,7 +113,7 @@ class FederationRemoteSendQueue(object): |
|
|
|
with Measure(self.clock, "send_queue._clear"): |
|
|
|
with Measure(self.clock, "send_queue._clear"): |
|
|
|
# Delete things out of presence maps |
|
|
|
# Delete things out of presence maps |
|
|
|
keys = self.presence_changed.keys() |
|
|
|
keys = self.presence_changed.keys() |
|
|
|
i = keys.bisect_left(position_to_delete) |
|
|
|
i = self.presence_changed.bisect_left(position_to_delete) |
|
|
|
for key in keys[:i]: |
|
|
|
for key in keys[:i]: |
|
|
|
del self.presence_changed[key] |
|
|
|
del self.presence_changed[key] |
|
|
|
|
|
|
|
|
|
|
@ -131,7 +131,7 @@ class FederationRemoteSendQueue(object): |
|
|
|
|
|
|
|
|
|
|
|
# Delete things out of keyed edus |
|
|
|
# Delete things out of keyed edus |
|
|
|
keys = self.keyed_edu_changed.keys() |
|
|
|
keys = self.keyed_edu_changed.keys() |
|
|
|
i = keys.bisect_left(position_to_delete) |
|
|
|
i = self.keyed_edu_changed.bisect_left(position_to_delete) |
|
|
|
for key in keys[:i]: |
|
|
|
for key in keys[:i]: |
|
|
|
del self.keyed_edu_changed[key] |
|
|
|
del self.keyed_edu_changed[key] |
|
|
|
|
|
|
|
|
|
|
@ -145,19 +145,19 @@ class FederationRemoteSendQueue(object): |
|
|
|
|
|
|
|
|
|
|
|
# Delete things out of edu map |
|
|
|
# Delete things out of edu map |
|
|
|
keys = self.edus.keys() |
|
|
|
keys = self.edus.keys() |
|
|
|
i = keys.bisect_left(position_to_delete) |
|
|
|
i = self.edus.bisect_left(position_to_delete) |
|
|
|
for key in keys[:i]: |
|
|
|
for key in keys[:i]: |
|
|
|
del self.edus[key] |
|
|
|
del self.edus[key] |
|
|
|
|
|
|
|
|
|
|
|
# Delete things out of failure map |
|
|
|
# Delete things out of failure map |
|
|
|
keys = self.failures.keys() |
|
|
|
keys = self.failures.keys() |
|
|
|
i = keys.bisect_left(position_to_delete) |
|
|
|
i = self.failures.bisect_left(position_to_delete) |
|
|
|
for key in keys[:i]: |
|
|
|
for key in keys[:i]: |
|
|
|
del self.failures[key] |
|
|
|
del self.failures[key] |
|
|
|
|
|
|
|
|
|
|
|
# Delete things out of device map |
|
|
|
# Delete things out of device map |
|
|
|
keys = self.device_messages.keys() |
|
|
|
keys = self.device_messages.keys() |
|
|
|
i = keys.bisect_left(position_to_delete) |
|
|
|
i = self.device_messages.bisect_left(position_to_delete) |
|
|
|
for key in keys[:i]: |
|
|
|
for key in keys[:i]: |
|
|
|
del self.device_messages[key] |
|
|
|
del self.device_messages[key] |
|
|
|
|
|
|
|
|
|
|
@ -250,13 +250,12 @@ class FederationRemoteSendQueue(object): |
|
|
|
self._clear_queue_before_pos(federation_ack) |
|
|
|
self._clear_queue_before_pos(federation_ack) |
|
|
|
|
|
|
|
|
|
|
|
# Fetch changed presence |
|
|
|
# Fetch changed presence |
|
|
|
keys = self.presence_changed.keys() |
|
|
|
i = self.presence_changed.bisect_right(from_token) |
|
|
|
i = keys.bisect_right(from_token) |
|
|
|
j = self.presence_changed.bisect_right(to_token) + 1 |
|
|
|
j = keys.bisect_right(to_token) + 1 |
|
|
|
|
|
|
|
dest_user_ids = [ |
|
|
|
dest_user_ids = [ |
|
|
|
(pos, user_id) |
|
|
|
(pos, user_id) |
|
|
|
for pos in keys[i:j] |
|
|
|
for pos, user_id_list in self.presence_changed.items()[i:j] |
|
|
|
for user_id in self.presence_changed[pos] |
|
|
|
for user_id in user_id_list |
|
|
|
] |
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
for (key, user_id) in dest_user_ids: |
|
|
|
for (key, user_id) in dest_user_ids: |
|
|
@ -265,13 +264,12 @@ class FederationRemoteSendQueue(object): |
|
|
|
))) |
|
|
|
))) |
|
|
|
|
|
|
|
|
|
|
|
# Fetch changes keyed edus |
|
|
|
# Fetch changes keyed edus |
|
|
|
keys = self.keyed_edu_changed.keys() |
|
|
|
i = self.keyed_edu_changed.bisect_right(from_token) |
|
|
|
i = keys.bisect_right(from_token) |
|
|
|
j = self.keyed_edu_changed.bisect_right(to_token) + 1 |
|
|
|
j = keys.bisect_right(to_token) + 1 |
|
|
|
|
|
|
|
# We purposefully clobber based on the key here, python dict comprehensions |
|
|
|
# We purposefully clobber based on the key here, python dict comprehensions |
|
|
|
# always use the last value, so this will correctly point to the last |
|
|
|
# always use the last value, so this will correctly point to the last |
|
|
|
# stream position. |
|
|
|
# stream position. |
|
|
|
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} |
|
|
|
keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]} |
|
|
|
|
|
|
|
|
|
|
|
for ((destination, edu_key), pos) in iteritems(keyed_edus): |
|
|
|
for ((destination, edu_key), pos) in iteritems(keyed_edus): |
|
|
|
rows.append((pos, KeyedEduRow( |
|
|
|
rows.append((pos, KeyedEduRow( |
|
|
@ -280,19 +278,17 @@ class FederationRemoteSendQueue(object): |
|
|
|
))) |
|
|
|
))) |
|
|
|
|
|
|
|
|
|
|
|
# Fetch changed edus |
|
|
|
# Fetch changed edus |
|
|
|
keys = self.edus.keys() |
|
|
|
i = self.edus.bisect_right(from_token) |
|
|
|
i = keys.bisect_right(from_token) |
|
|
|
j = self.edus.bisect_right(to_token) + 1 |
|
|
|
j = keys.bisect_right(to_token) + 1 |
|
|
|
edus = self.edus.items()[i:j] |
|
|
|
edus = ((k, self.edus[k]) for k in keys[i:j]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (pos, edu) in edus: |
|
|
|
for (pos, edu) in edus: |
|
|
|
rows.append((pos, EduRow(edu))) |
|
|
|
rows.append((pos, EduRow(edu))) |
|
|
|
|
|
|
|
|
|
|
|
# Fetch changed failures |
|
|
|
# Fetch changed failures |
|
|
|
keys = self.failures.keys() |
|
|
|
i = self.failures.bisect_right(from_token) |
|
|
|
i = keys.bisect_right(from_token) |
|
|
|
j = self.failures.bisect_right(to_token) + 1 |
|
|
|
j = keys.bisect_right(to_token) + 1 |
|
|
|
failures = self.failures.items()[i:j] |
|
|
|
failures = ((k, self.failures[k]) for k in keys[i:j]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (pos, (destination, failure)) in failures: |
|
|
|
for (pos, (destination, failure)) in failures: |
|
|
|
rows.append((pos, FailureRow( |
|
|
|
rows.append((pos, FailureRow( |
|
|
@ -301,10 +297,9 @@ class FederationRemoteSendQueue(object): |
|
|
|
))) |
|
|
|
))) |
|
|
|
|
|
|
|
|
|
|
|
# Fetch changed device messages |
|
|
|
# Fetch changed device messages |
|
|
|
keys = self.device_messages.keys() |
|
|
|
i = self.device_messages.bisect_right(from_token) |
|
|
|
i = keys.bisect_right(from_token) |
|
|
|
j = self.device_messages.bisect_right(to_token) + 1 |
|
|
|
j = keys.bisect_right(to_token) + 1 |
|
|
|
device_messages = {v: k for k, v in self.device_messages.items()[i:j]} |
|
|
|
device_messages = {self.device_messages[k]: k for k in keys[i:j]} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (destination, pos) in iteritems(device_messages): |
|
|
|
for (destination, pos) in iteritems(device_messages): |
|
|
|
rows.append((pos, DeviceRow( |
|
|
|
rows.append((pos, DeviceRow( |
|
|
|