Fix assertion to stop transaction queue getting wedged

... and update some docstrings to correctly reflect the types being used.

get_new_device_msgs_for_remote can return a long under some circumstances,
which was being stored in last_device_list_stream_id_by_dest, and was then
upsetting things on the next loop.
pull/4/merge
Richard van der Hoff 8 years ago
parent 3b2dd1b3c2
commit 29ed09e80a
  1. 5
      synapse/federation/transaction_queue.py
  2. 5
      synapse/replication/slave/storage/_slaved_id_tracker.py
  3. 6
      synapse/storage/deviceinbox.py
  4. 2
      synapse/storage/devices.py
  5. 14
      synapse/storage/util/id_generators.py
  6. 2
      synapse/util/caches/stream_change_cache.py

@ -99,7 +99,12 @@ class TransactionQueue(object):
# destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {}
# destination -> stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self.last_device_stream_id_by_dest = {}
# destination -> stream_id of last successfully sent device list
# update.
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id

@ -27,4 +27,9 @@ class SlavedIdTracker(object):
self._current = (max if self.step > 0 else min)(self._current, new_id)
def get_current_token(self):
"""
Returns:
int
"""
return self._current

@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore):
"""
Args:
destination(str): The name of the remote server.
last_stream_id(int): The last position of the device message stream
last_stream_id(int|long): The last position of the device message stream
that the server sent up to.
current_stream_id(int): The current position of the device
current_stream_id(int|long): The current position of the device
message stream.
Returns:
Deferred ([dict], int): List of messages for the device and where
Deferred ([dict], int|long): List of messages for the device and where
in the stream the messages got to.
"""

@ -308,7 +308,7 @@ class DeviceStore(SQLBaseStore):
"""Get stream of updates to send to remote servers
Returns:
(now_stream_id, [ { updates }, .. ])
(int, list[dict]): current stream id and list of updates
"""
now_stream_id = self._device_list_id_gen.get_current_token()

@ -30,6 +30,17 @@ class IdGenerator(object):
def _load_current_id(db_conn, table, column, step=1):
"""
Args:
db_conn (object):
table (str):
column (str):
step (int):
Returns:
int
"""
cur = db_conn.cursor()
if step == 1:
cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
@ -131,6 +142,9 @@ class StreamIdGenerator(object):
def get_current_token(self):
"""Returns the maximum stream id such that all stream ids less than or
equal to it have been successfully persisted.
Returns:
int
"""
with self._lock:
if self._unfinished_ids:

@ -50,7 +50,7 @@ class StreamChangeCache(object):
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
assert type(stream_pos) is int
assert type(stream_pos) is int or type(stream_pos) is long
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()

Loading…
Cancel
Save