Fix typing replication not being handled on master (#7959)

Handling of incoming typing stream updates from replication was not
hooked up on master, effecting set ups where typing was handled on a
different worker.

This is really only a problem if the master process is also handling
sync requests, which is unlikely for those that are at the stage of
moving typing off.

The other observable effect is that if a worker restarts or a
replication connect drops then the typing worker will issue a
`POSITION typing`, triggering master process to try and stream *all*
typing updates from position 0.

Fixes #7907
code_spécifique_watcha
Erik Johnston 4 years ago committed by GitHub
parent d8a9cd8d3e
commit 84d099ae11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/7959.bugfix
  2. 7
      synapse/app/generic_worker.py
  3. 8
      synapse/replication/tcp/client.py
  4. 3
      synapse/server.pyi

@ -0,0 +1 @@
Add experimental support for moving typing off master.

@ -87,7 +87,6 @@ from synapse.replication.tcp.streams import (
ReceiptsStream, ReceiptsStream,
TagAccountDataStream, TagAccountDataStream,
ToDeviceStream, ToDeviceStream,
TypingStream,
) )
from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events from synapse.rest.client.v1 import events
@ -644,7 +643,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
super(GenericWorkerReplicationHandler, self).__init__(hs) super(GenericWorkerReplicationHandler, self).__init__(hs)
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
@ -681,11 +679,6 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
await self.pusher_pool.on_new_receipts( await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows} token, token, {row.room_id for row in rows}
) )
elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
elif stream_name == ToDeviceStream.NAME: elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")] entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities: if entities:

@ -24,6 +24,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import TypingStream
from synapse.replication.tcp.streams.events import ( from synapse.replication.tcp.streams.events import (
EventsStream, EventsStream,
EventsStreamEventRow, EventsStreamEventRow,
@ -104,6 +105,7 @@ class ReplicationDataHandler:
self._clock = hs.get_clock() self._clock = hs.get_clock()
self._streams = hs.get_replication_streams() self._streams = hs.get_replication_streams()
self._instance_name = hs.get_instance_name() self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()
# Map from stream to list of deferreds waiting for the stream to # Map from stream to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position. # arrive at a particular position. The lists are sorted by stream position.
@ -127,6 +129,12 @@ class ReplicationDataHandler:
""" """
self.store.process_replication_rows(stream_name, instance_name, token, rows) self.store.process_replication_rows(stream_name, instance_name, token, rows)
if stream_name == TypingStream.NAME:
self._typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
if stream_name == EventsStream.NAME: if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so # We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows. # we don't need to optimise this for multiple rows.

@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender
import synapse.state import synapse.state
import synapse.storage import synapse.storage
from synapse.events.builder import EventBuilderFactory from synapse.events.builder import EventBuilderFactory
from synapse.handlers.typing import FollowerTypingHandler
from synapse.replication.tcp.streams import Stream from synapse.replication.tcp.streams import Stream
class HomeServer(object): class HomeServer(object):
@ -150,3 +151,5 @@ class HomeServer(object):
pass pass
def should_send_federation(self) -> bool: def should_send_federation(self) -> bool:
pass pass
def get_typing_handler(self) -> FollowerTypingHandler:
pass

Loading…
Cancel
Save