|
|
|
@ -362,21 +362,36 @@ class SyncHandler: |
|
|
|
|
# (since we now know that the device has received them) |
|
|
|
|
if since_token is not None: |
|
|
|
|
since_stream_id = since_token.to_device_key |
|
|
|
|
# Delete device messages asynchronously and in batches using the task scheduler |
|
|
|
|
await self._task_scheduler.schedule_task( |
|
|
|
|
DELETE_DEVICE_MSGS_TASK_NAME, |
|
|
|
|
resource_id=sync_config.device_id, |
|
|
|
|
params={ |
|
|
|
|
"user_id": sync_config.user.to_string(), |
|
|
|
|
"device_id": sync_config.device_id, |
|
|
|
|
"up_to_stream_id": since_stream_id, |
|
|
|
|
}, |
|
|
|
|
# Fast path: delete a limited number of to-device messages up front. |
|
|
|
|
# We do this to avoid the overhead of scheduling a task for every |
|
|
|
|
# sync. |
|
|
|
|
device_deletion_limit = 100 |
|
|
|
|
deleted = await self.store.delete_messages_for_device( |
|
|
|
|
sync_config.user.to_string(), |
|
|
|
|
sync_config.device_id, |
|
|
|
|
since_stream_id, |
|
|
|
|
limit=device_deletion_limit, |
|
|
|
|
) |
|
|
|
|
logger.debug( |
|
|
|
|
"Deletion of to-device messages up to %d scheduled", |
|
|
|
|
since_stream_id, |
|
|
|
|
"Deleted %d to-device messages up to %d", deleted, since_stream_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# If we hit the limit, schedule a background task to delete the rest. |
|
|
|
|
if deleted >= device_deletion_limit: |
|
|
|
|
await self._task_scheduler.schedule_task( |
|
|
|
|
DELETE_DEVICE_MSGS_TASK_NAME, |
|
|
|
|
resource_id=sync_config.device_id, |
|
|
|
|
params={ |
|
|
|
|
"user_id": sync_config.user.to_string(), |
|
|
|
|
"device_id": sync_config.device_id, |
|
|
|
|
"up_to_stream_id": since_stream_id, |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
logger.debug( |
|
|
|
|
"Deletion of to-device messages up to %d scheduled", |
|
|
|
|
since_stream_id, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if timeout == 0 or since_token is None or full_state: |
|
|
|
|
# we are going to return immediately, so don't bother calling |
|
|
|
|
# notifier.wait_for_events. |
|
|
|
|