|
|
|
@ -15,6 +15,7 @@ |
|
|
|
|
import logging |
|
|
|
|
from typing import List, Optional, Tuple |
|
|
|
|
|
|
|
|
|
from synapse.logging import issue9533_logger |
|
|
|
|
from synapse.logging.opentracing import log_kv, set_tag, trace |
|
|
|
|
from synapse.replication.tcp.streams import ToDeviceStream |
|
|
|
|
from synapse.storage._base import SQLBaseStore, db_to_json |
|
|
|
@ -404,6 +405,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if remote_messages_by_destination: |
|
|
|
|
issue9533_logger.debug( |
|
|
|
|
"Queued outgoing to-device messages with stream_id %i for %s", |
|
|
|
|
stream_id, |
|
|
|
|
list(remote_messages_by_destination.keys()), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
async with self._device_inbox_id_gen.get_next() as stream_id: |
|
|
|
|
now_ms = self.clock.time_msec() |
|
|
|
|
await self.db_pool.runInteraction( |
|
|
|
@ -533,6 +541,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
issue9533_logger.debug( |
|
|
|
|
"Stored to-device messages with stream_id %i for %s", |
|
|
|
|
stream_id, |
|
|
|
|
[ |
|
|
|
|
(user_id, device_id) |
|
|
|
|
for (user_id, messages_by_device) in local_by_user_then_device.items() |
|
|
|
|
for device_id in messages_by_device.keys() |
|
|
|
|
], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeviceInboxBackgroundUpdateStore(SQLBaseStore): |
|
|
|
|
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" |
|
|
|
|