|
|
|
@ -42,6 +42,8 @@ class UserDirectoyHandler(object): |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
INITIAL_SLEEP_MS = 50 |
|
|
|
|
INITIAL_SLEEP_COUNT = 100 |
|
|
|
|
INITIAL_BATCH_SIZE = 100 |
|
|
|
|
|
|
|
|
|
def __init__(self, hs): |
|
|
|
|
self.store = hs.get_datastore() |
|
|
|
@ -126,6 +128,7 @@ class UserDirectoyHandler(object): |
|
|
|
|
if not deltas: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
logger.info("Handling %d state deltas", len(deltas)) |
|
|
|
|
yield self._handle_deltas(deltas) |
|
|
|
|
|
|
|
|
|
self.pos = deltas[-1]["stream_id"] |
|
|
|
@ -187,9 +190,9 @@ class UserDirectoyHandler(object): |
|
|
|
|
if is_public: |
|
|
|
|
yield self.store.add_users_to_public_room( |
|
|
|
|
room_id, |
|
|
|
|
user_ids=unhandled_users - self.initially_handled_users_in_public |
|
|
|
|
user_ids=user_ids - self.initially_handled_users_in_public |
|
|
|
|
) |
|
|
|
|
self.initially_handled_users_in_public != unhandled_users |
|
|
|
|
self.initially_handled_users_in_public |= user_ids |
|
|
|
|
|
|
|
|
|
# We now go and figure out the new users who share rooms with user entries |
|
|
|
|
# We sleep aggressively here as otherwise it can starve resources. |
|
|
|
@ -198,7 +201,7 @@ class UserDirectoyHandler(object): |
|
|
|
|
to_update = set() |
|
|
|
|
count = 0 |
|
|
|
|
for user_id in user_ids: |
|
|
|
|
if count % 100 == 0: |
|
|
|
|
if count % self.INITIAL_SLEEP_COUNT == 0: |
|
|
|
|
yield sleep(self.INITIAL_SLEEP_MS / 1000.) |
|
|
|
|
|
|
|
|
|
if not self.is_mine_id(user_id): |
|
|
|
@ -213,7 +216,7 @@ class UserDirectoyHandler(object): |
|
|
|
|
if user_id == other_user_id: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
if count % 100 == 0: |
|
|
|
|
if count % self.INITIAL_SLEEP_COUNT == 0: |
|
|
|
|
yield sleep(self.INITIAL_SLEEP_MS / 1000.) |
|
|
|
|
count += 1 |
|
|
|
|
|
|
|
|
@ -234,13 +237,13 @@ class UserDirectoyHandler(object): |
|
|
|
|
else: |
|
|
|
|
self.initially_handled_users_share_private_room.add(user_set) |
|
|
|
|
|
|
|
|
|
if len(to_insert) > 100: |
|
|
|
|
if len(to_insert) > self.INITIAL_BATCH_SIZE: |
|
|
|
|
yield self.store.add_users_who_share_room( |
|
|
|
|
room_id, not is_public, to_insert, |
|
|
|
|
) |
|
|
|
|
to_insert.clear() |
|
|
|
|
|
|
|
|
|
if len(to_update) > 100: |
|
|
|
|
if len(to_update) > self.INITIAL_BATCH_SIZE: |
|
|
|
|
yield self.store.update_users_who_share_room( |
|
|
|
|
room_id, not is_public, to_update, |
|
|
|
|
) |
|
|
|
@ -298,7 +301,7 @@ class UserDirectoyHandler(object): |
|
|
|
|
room_id, self.server_name, |
|
|
|
|
) |
|
|
|
|
if not is_in_room: |
|
|
|
|
logger.debug("Server left room: %r", room_id) |
|
|
|
|
logger.info("Server left room: %r", room_id) |
|
|
|
|
# Fetch all the users that we marked as being in user |
|
|
|
|
# directory due to being in the room and then check if |
|
|
|
|
# need to remove those users or not |
|
|
|
|