Speed up user directory rebuild for users some more... (#15665)

1.103.0-whithout-watcha
Erik Johnston 2 years ago committed by GitHub
parent 1f55c04cbc
commit c7e9c1d5ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/15665.misc
  2. 190
      synapse/storage/databases/main/user_directory.py

@ -0,0 +1 @@
Speed up rebuilding of the user directory for local users.

@ -17,6 +17,7 @@ import re
import unicodedata
from typing import (
TYPE_CHECKING,
Collection,
Iterable,
List,
Mapping,
@ -45,7 +46,7 @@ from synapse.util.stringutils import non_null_str_or_none
if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
@ -356,13 +357,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
Add all local users to the user directory.
"""
def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
sql = "SELECT user_id FROM %s LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
user_result = cast(List[Tuple[str]], txn.fetchall())
def _populate_user_directory_process_users_txn(
txn: LoggingTransaction,
) -> Optional[int]:
if self.database_engine.supports_returning:
# Note: we use an ORDER BY in the SELECT to force usage of an
# index. Otherwise, postgres does a sequential scan that is
# surprisingly slow (I think due to the fact it will read/skip
# over lots of already deleted rows).
sql = f"""
DELETE FROM {TEMP_TABLE + "_users"}
WHERE user_id IN (
SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
)
RETURNING user_id
"""
txn.execute(sql, (batch_size,))
user_result = cast(List[Tuple[str]], txn.fetchall())
else:
sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
TEMP_TABLE + "_users",
str(batch_size),
)
txn.execute(sql)
user_result = cast(List[Tuple[str]], txn.fetchall())
if not user_result:
return None
@ -378,85 +396,81 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
assert count_result is not None
progress["remaining"] = count_result[0]
return users_to_work_on
users_to_work_on = await self.db_pool.runInteraction(
"populate_user_directory_temp_read", _get_next_batch
)
if not users_to_work_on:
return None
# No more users -- complete the transaction.
if not users_to_work_on:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
logger.debug(
"Processing the next %d users of %d remaining",
len(users_to_work_on),
progress["remaining"],
)
return 1
logger.debug(
"Processing the next %d users of %d remaining"
% (len(users_to_work_on), progress["remaining"])
)
# First filter down to users we want to insert into the user directory.
users_to_insert = [
user_id
for user_id in users_to_work_on
if await self.should_include_local_user_in_dir(user_id)
]
# First filter down to users we want to insert into the user directory.
users_to_insert = self._filter_local_users_for_dir_txn(
txn, users_to_work_on
)
# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = await self.db_pool.simple_select_many_batch(
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
desc="populate_user_directory_process_users_get_profiles",
)
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
# Next fetch their profiles. Note that the `user_id` here is the
# *localpart*, and that not all users have profiles.
profile_rows = self.db_pool.simple_select_many_txn(
txn,
table="profiles",
column="user_id",
iterable=[get_localpart_from_id(u) for u in users_to_insert],
retcols=(
"user_id",
"displayname",
"avatar_url",
),
keyvalues={},
)
for row in profile_rows
}
profiles = {
f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
f"@{row['user_id']}:{self.server_name}",
row["displayname"],
row["avatar_url"],
)
for row in profile_rows
}
profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]
profiles_to_insert = [
profiles.get(user_id) or _UserDirProfile(user_id)
for user_id in users_to_insert
]
# Actually insert the users with their profiles into the directory.
self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)
# We've finished processing the users. Delete it from the table, if
# we haven't already.
if not self.database_engine.supports_returning:
self.db_pool.simple_delete_many_txn(
txn,
table=TEMP_TABLE + "_users",
column="user_id",
values=users_to_work_on,
keyvalues={},
)
# Actually insert the users with their profiles into the directory.
await self.db_pool.runInteraction(
"populate_user_directory_process_users_insertion",
self._update_profiles_in_user_dir_txn,
profiles_to_insert,
)
# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
self.db_pool.updates._background_update_progress_txn(
txn, "populate_user_directory_process_users", progress
)
return len(users_to_work_on)
# We've finished processing the users. Delete it from the table.
await self.db_pool.simple_delete_many(
table=TEMP_TABLE + "_users",
column="user_id",
iterable=users_to_work_on,
keyvalues={},
desc="populate_user_directory_process_users_delete",
processed_count = await self.db_pool.runInteraction(
"populate_user_directory_temp", _populate_user_directory_process_users_txn
)
# Update the remaining counter.
progress["remaining"] -= len(users_to_work_on)
await self.db_pool.runInteraction(
"populate_user_directory",
self.db_pool.updates._background_update_progress_txn,
"populate_user_directory_process_users",
progress,
)
# No more users -- complete the transaction.
if not processed_count:
await self.db_pool.updates._end_background_update(
"populate_user_directory_process_users"
)
return 1
return len(users_to_work_on)
return processed_count
async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
@ -494,6 +508,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return True
def _filter_local_users_for_dir_txn(
self, txn: LoggingTransaction, users: Collection[str]
) -> Collection[str]:
"""A batched version of `should_include_local_user_in_dir`"""
users = [
user
for user in users
if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined]
and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined]
]
rows = self.db_pool.simple_select_many_txn(
txn,
table="users",
column="name",
iterable=users,
keyvalues={
"deactivated": 0,
},
retcols=("name", "user_type"),
)
return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT]
async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
"""Check if the room is either world_readable or publically joinable"""

Loading…
Cancel
Save