@ -26,18 +26,22 @@ import contextlib
import logging
from bisect import bisect
from contextlib import contextmanager
from types import TracebackType
from typing import (
TYPE_CHECKING ,
Any ,
Awaitable ,
Callable ,
Collection ,
Dict ,
FrozenSet ,
Generator ,
Iterable ,
List ,
Optional ,
Set ,
Tuple ,
Type ,
Union ,
)
@ -240,7 +244,7 @@ class BasePresenceHandler(abc.ABC):
"""
@abc . abstractmethod
async def bump_presence_active_time ( self , user : UserID ) :
async def bump_presence_active_time ( self , user : UserID ) - > None :
""" We ' ve seen the user do something that indicates they ' re interacting
with the app .
"""
@ -274,7 +278,7 @@ class BasePresenceHandler(abc.ABC):
async def process_replication_rows (
self , stream_name : str , instance_name : str , token : int , rows : list
) :
) - > None :
""" Process streams received over replication. """
await self . _federation_queue . process_replication_rows (
stream_name , instance_name , token , rows
@ -286,7 +290,7 @@ class BasePresenceHandler(abc.ABC):
async def maybe_send_presence_to_interested_destinations (
self , states : List [ UserPresenceState ]
) :
) - > None :
""" If this instance is a federation sender, send the states to all
destinations that are interested . Filters out any states for remote
users .
@ -309,7 +313,7 @@ class BasePresenceHandler(abc.ABC):
for destination , host_states in hosts_to_states . items ( ) :
self . _federation . send_presence_to_destinations ( host_states , [ destination ] )
async def send_full_presence_to_users ( self , user_ids : Collection [ str ] ) :
async def send_full_presence_to_users ( self , user_ids : Collection [ str ] ) - > None :
"""
Adds to the list of users who should receive a full snapshot of presence
upon their next sync . Note that this only works for local users .
@ -363,7 +367,12 @@ class BasePresenceHandler(abc.ABC):
class _NullContextManager ( ContextManager [ None ] ) :
""" A context manager which does nothing. """
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
def __exit__ (
self ,
exc_type : Optional [ Type [ BaseException ] ] ,
exc_val : Optional [ BaseException ] ,
exc_tb : Optional [ TracebackType ] ,
) - > None :
pass
@ -468,7 +477,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
if self . _user_to_num_current_syncs [ user_id ] == 1 :
self . mark_as_coming_online ( user_id )
def _end ( ) :
def _end ( ) - > None :
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
@ -480,7 +489,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
self . mark_as_going_offline ( user_id )
@contextlib . contextmanager
def _user_syncing ( ) :
def _user_syncing ( ) - > Generator [ None , None , None ] :
try :
yield
finally :
@ -503,7 +512,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
async def process_replication_rows (
self , stream_name : str , instance_name : str , token : int , rows : list
) :
) - > None :
await super ( ) . process_replication_rows ( stream_name , instance_name , token , rows )
if stream_name != PresenceStream . NAME :
@ -689,7 +698,7 @@ class PresenceHandler(BasePresenceHandler):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
def run_timeout_handler ( ) :
def run_timeout_handler ( ) - > Awaitable [ None ] :
return run_as_background_process (
" handle_presence_timeouts " , self . _handle_timeouts
)
@ -698,7 +707,7 @@ class PresenceHandler(BasePresenceHandler):
30 , self . clock . looping_call , run_timeout_handler , 5000
)
def run_persister ( ) :
def run_persister ( ) - > Awaitable [ None ] :
return run_as_background_process (
" persist_presence_changes " , self . _persist_unpersisted_changes
)
@ -942,8 +951,8 @@ class PresenceHandler(BasePresenceHandler):
when users disconnect / reconnect .
Args :
user_id ( str )
affect_presence ( bool ) : If false this function will be a no - op .
user_id
affect_presence : If false this function will be a no - op .
Useful for streams that are not associated with an actual
client that is being used by a user .
"""
@ -978,7 +987,7 @@ class PresenceHandler(BasePresenceHandler):
]
)
async def _end ( ) :
async def _end ( ) - > None :
try :
self . user_to_num_current_syncs [ user_id ] - = 1
@ -994,7 +1003,7 @@ class PresenceHandler(BasePresenceHandler):
logger . exception ( " Error updating presence after sync " )
@contextmanager
def _user_syncing ( ) :
def _user_syncing ( ) - > Generator [ None , None , None ] :
try :
yield
finally :
@ -1264,7 +1273,7 @@ class PresenceHandler(BasePresenceHandler):
if self . _event_processing :
return
async def _process_presence ( ) :
async def _process_presence ( ) - > None :
assert not self . _event_processing
self . _event_processing = True
@ -1513,7 +1522,7 @@ class PresenceEventSource:
room_ids : Optional [ List [ str ] ] = None ,
include_offline : bool = True ,
explicit_room_id : Optional [ str ] = None ,
* * kwargs ,
* * kwargs : Any ,
) - > Tuple [ List [ UserPresenceState ] , int ] :
# The process for getting presence events are:
# 1. Get the rooms the user is in.
@ -2074,7 +2083,7 @@ class PresenceFederationQueue:
if self . _queue_presence_updates :
self . _clock . looping_call ( self . _clear_queue , self . _CLEAR_ITEMS_EVERY_MS )
def _clear_queue ( self ) :
def _clear_queue ( self ) - > None :
""" Clear out older entries from the queue. """
clear_before = self . _clock . time_msec ( ) - self . _KEEP_ITEMS_IN_QUEUE_FOR_MS
@ -2205,7 +2214,7 @@ class PresenceFederationQueue:
async def process_replication_rows (
self , stream_name : str , instance_name : str , token : int , rows : list
) :
) - > None :
if stream_name != PresenceFederationStream . NAME :
return