|
|
|
@ -12,7 +12,7 @@ |
|
|
|
|
# See the License for the specific language governing permissions and |
|
|
|
|
# limitations under the License. |
|
|
|
|
|
|
|
|
|
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, cast |
|
|
|
|
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, cast |
|
|
|
|
|
|
|
|
|
from synapse.api.presence import PresenceState, UserPresenceState |
|
|
|
|
from synapse.replication.tcp.streams import PresenceStream |
|
|
|
@ -22,6 +22,7 @@ from synapse.storage.database import ( |
|
|
|
|
LoggingDatabaseConnection, |
|
|
|
|
LoggingTransaction, |
|
|
|
|
) |
|
|
|
|
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore |
|
|
|
|
from synapse.storage.engines import PostgresEngine |
|
|
|
|
from synapse.storage.types import Connection |
|
|
|
|
from synapse.storage.util.id_generators import ( |
|
|
|
@ -56,7 +57,7 @@ class PresenceBackgroundUpdateStore(SQLBaseStore): |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PresenceStore(PresenceBackgroundUpdateStore): |
|
|
|
|
class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore): |
|
|
|
|
def __init__( |
|
|
|
|
self, |
|
|
|
|
database: DatabasePool, |
|
|
|
@ -281,20 +282,30 @@ class PresenceStore(PresenceBackgroundUpdateStore): |
|
|
|
|
True if the user should have full presence sent to them, False otherwise. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
def _should_user_receive_full_presence_with_token_txn( |
|
|
|
|
txn: LoggingTransaction, |
|
|
|
|
) -> bool: |
|
|
|
|
sql = """ |
|
|
|
|
SELECT 1 FROM users_to_send_full_presence_to |
|
|
|
|
WHERE user_id = ? |
|
|
|
|
AND presence_stream_id >= ? |
|
|
|
|
""" |
|
|
|
|
txn.execute(sql, (user_id, from_token)) |
|
|
|
|
return bool(txn.fetchone()) |
|
|
|
|
token = await self._get_full_presence_stream_token_for_user(user_id) |
|
|
|
|
if token is None: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
return await self.db_pool.runInteraction( |
|
|
|
|
"should_user_receive_full_presence_with_token", |
|
|
|
|
_should_user_receive_full_presence_with_token_txn, |
|
|
|
|
return from_token <= token |
|
|
|
|
|
|
|
|
|
@cached() |
|
|
|
|
async def _get_full_presence_stream_token_for_user( |
|
|
|
|
self, user_id: str |
|
|
|
|
) -> Optional[int]: |
|
|
|
|
"""Get the presence token corresponding to the last full presence update |
|
|
|
|
for this user. |
|
|
|
|
|
|
|
|
|
If the user presents a sync token with a presence stream token at least |
|
|
|
|
as old as the result, then we need to send them a full presence update. |
|
|
|
|
|
|
|
|
|
If this user has never needed a full presence update, returns `None`. |
|
|
|
|
""" |
|
|
|
|
return await self.db_pool.simple_select_one_onecol( |
|
|
|
|
table="users_to_send_full_presence_to", |
|
|
|
|
keyvalues={"user_id": user_id}, |
|
|
|
|
retcol="presence_stream_id", |
|
|
|
|
allow_none=True, |
|
|
|
|
desc="_get_full_presence_stream_token_for_user", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> None: |
|
|
|
@ -307,18 +318,28 @@ class PresenceStore(PresenceBackgroundUpdateStore): |
|
|
|
|
# Add user entries to the table, updating the presence_stream_id column if the user already |
|
|
|
|
# exists in the table. |
|
|
|
|
presence_stream_id = self._presence_id_gen.get_current_token() |
|
|
|
|
await self.db_pool.simple_upsert_many( |
|
|
|
|
table="users_to_send_full_presence_to", |
|
|
|
|
key_names=("user_id",), |
|
|
|
|
key_values=[(user_id,) for user_id in user_ids], |
|
|
|
|
value_names=("presence_stream_id",), |
|
|
|
|
# We save the current presence stream ID token along with the user ID entry so |
|
|
|
|
# that when a user /sync's, even if they syncing multiple times across separate |
|
|
|
|
# devices at different times, each device will receive full presence once - when |
|
|
|
|
# the presence stream ID in their sync token is less than the one in the table |
|
|
|
|
# for their user ID. |
|
|
|
|
value_values=[(presence_stream_id,) for _ in user_ids], |
|
|
|
|
desc="add_users_to_send_full_presence_to", |
|
|
|
|
|
|
|
|
|
def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None: |
|
|
|
|
self.db_pool.simple_upsert_many_txn( |
|
|
|
|
txn, |
|
|
|
|
table="users_to_send_full_presence_to", |
|
|
|
|
key_names=("user_id",), |
|
|
|
|
key_values=[(user_id,) for user_id in user_ids], |
|
|
|
|
value_names=("presence_stream_id",), |
|
|
|
|
# We save the current presence stream ID token along with the user ID entry so |
|
|
|
|
# that when a user /sync's, even if they syncing multiple times across separate |
|
|
|
|
# devices at different times, each device will receive full presence once - when |
|
|
|
|
# the presence stream ID in their sync token is less than the one in the table |
|
|
|
|
# for their user ID. |
|
|
|
|
value_values=[(presence_stream_id,) for _ in user_ids], |
|
|
|
|
) |
|
|
|
|
for user_id in user_ids: |
|
|
|
|
self._invalidate_cache_and_stream( |
|
|
|
|
txn, self._get_full_presence_stream_token_for_user, (user_id,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return await self.db_pool.runInteraction( |
|
|
|
|
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
async def get_presence_for_all_users( |
|
|
|
|