|
|
|
@ -15,6 +15,7 @@ |
|
|
|
|
|
|
|
|
|
from ._base import SQLBaseStore |
|
|
|
|
from twisted.internet import defer |
|
|
|
|
from synapse.util.async import sleep |
|
|
|
|
from synapse.util.caches.descriptors import cachedInlineCallbacks |
|
|
|
|
from synapse.types import RoomStreamToken |
|
|
|
|
from .stream import lower_bound |
|
|
|
@ -29,7 +30,6 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
EPA_HIGHLIGHT_INDEX = "epa_highlight_index" |
|
|
|
|
|
|
|
|
|
def __init__(self, hs): |
|
|
|
|
self.stream_ordering_month_ago = None |
|
|
|
|
super(EventPushActionsStore, self).__init__(hs) |
|
|
|
|
|
|
|
|
|
self.register_background_index_update( |
|
|
|
@ -47,6 +47,9 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
where_clause="highlight=1" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self._doing_notif_rotation = False |
|
|
|
|
self._clock.looping_call(self._rotate_notifs, 30 * 60 * 1000) |
|
|
|
|
|
|
|
|
|
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): |
|
|
|
|
""" |
|
|
|
|
Args: |
|
|
|
@ -77,66 +80,89 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
def get_unread_event_push_actions_by_room_for_user( |
|
|
|
|
self, room_id, user_id, last_read_event_id |
|
|
|
|
): |
|
|
|
|
def _get_unread_event_push_actions_by_room(txn): |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT stream_ordering, topological_ordering" |
|
|
|
|
" FROM events" |
|
|
|
|
" WHERE room_id = ? AND event_id = ?" |
|
|
|
|
) |
|
|
|
|
txn.execute( |
|
|
|
|
sql, (room_id, last_read_event_id) |
|
|
|
|
) |
|
|
|
|
results = txn.fetchall() |
|
|
|
|
if len(results) == 0: |
|
|
|
|
return {"notify_count": 0, "highlight_count": 0} |
|
|
|
|
|
|
|
|
|
stream_ordering = results[0][0] |
|
|
|
|
topological_ordering = results[0][1] |
|
|
|
|
token = RoomStreamToken( |
|
|
|
|
topological_ordering, stream_ordering |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# First get number of notifications. |
|
|
|
|
# We don't need to put a notif=1 clause as all rows always have |
|
|
|
|
# notif=1 |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT count(*)" |
|
|
|
|
" FROM event_push_actions ea" |
|
|
|
|
" WHERE" |
|
|
|
|
" user_id = ?" |
|
|
|
|
" AND room_id = ?" |
|
|
|
|
" AND %s" |
|
|
|
|
) % (lower_bound(token, self.database_engine, inclusive=False),) |
|
|
|
|
ret = yield self.runInteraction( |
|
|
|
|
"get_unread_event_push_actions_by_room", |
|
|
|
|
self._get_unread_counts_by_receipt_txn, |
|
|
|
|
room_id, user_id, last_read_event_id |
|
|
|
|
) |
|
|
|
|
defer.returnValue(ret) |
|
|
|
|
|
|
|
|
|
txn.execute(sql, (user_id, room_id)) |
|
|
|
|
row = txn.fetchone() |
|
|
|
|
notify_count = row[0] if row else 0 |
|
|
|
|
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id, |
|
|
|
|
last_read_event_id): |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT stream_ordering, topological_ordering" |
|
|
|
|
" FROM events" |
|
|
|
|
" WHERE room_id = ? AND event_id = ?" |
|
|
|
|
) |
|
|
|
|
txn.execute( |
|
|
|
|
sql, (room_id, last_read_event_id) |
|
|
|
|
) |
|
|
|
|
results = txn.fetchall() |
|
|
|
|
if len(results) == 0: |
|
|
|
|
return {"notify_count": 0, "highlight_count": 0} |
|
|
|
|
|
|
|
|
|
# Now get the number of highlights |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT count(*)" |
|
|
|
|
" FROM event_push_actions ea" |
|
|
|
|
" WHERE" |
|
|
|
|
" highlight = 1" |
|
|
|
|
" AND user_id = ?" |
|
|
|
|
" AND room_id = ?" |
|
|
|
|
" AND %s" |
|
|
|
|
) % (lower_bound(token, self.database_engine, inclusive=False),) |
|
|
|
|
stream_ordering = results[0][0] |
|
|
|
|
topological_ordering = results[0][1] |
|
|
|
|
|
|
|
|
|
txn.execute(sql, (user_id, room_id)) |
|
|
|
|
row = txn.fetchone() |
|
|
|
|
highlight_count = row[0] if row else 0 |
|
|
|
|
return self._get_unread_counts_by_pos_txn( |
|
|
|
|
txn, room_id, user_id, topological_ordering, stream_ordering |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
return { |
|
|
|
|
"notify_count": notify_count, |
|
|
|
|
"highlight_count": highlight_count, |
|
|
|
|
} |
|
|
|
|
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering, |
|
|
|
|
stream_ordering): |
|
|
|
|
token = RoomStreamToken( |
|
|
|
|
topological_ordering, stream_ordering |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
ret = yield self.runInteraction( |
|
|
|
|
"get_unread_event_push_actions_by_room", |
|
|
|
|
_get_unread_event_push_actions_by_room |
|
|
|
|
# First get number of notifications. |
|
|
|
|
# We don't need to put a notif=1 clause as all rows always have |
|
|
|
|
# notif=1 |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT count(*)" |
|
|
|
|
" FROM event_push_actions ea" |
|
|
|
|
" WHERE" |
|
|
|
|
" user_id = ?" |
|
|
|
|
" AND room_id = ?" |
|
|
|
|
" AND %s" |
|
|
|
|
) % (lower_bound(token, self.database_engine, inclusive=False),) |
|
|
|
|
|
|
|
|
|
txn.execute(sql, (user_id, room_id)) |
|
|
|
|
row = txn.fetchone() |
|
|
|
|
notify_count = row[0] if row else 0 |
|
|
|
|
|
|
|
|
|
summary_notif_count = self._simple_select_one_onecol_txn( |
|
|
|
|
txn, |
|
|
|
|
table="event_push_summary", |
|
|
|
|
keyvalues={ |
|
|
|
|
"user_id": user_id, |
|
|
|
|
"room_id": room_id, |
|
|
|
|
}, |
|
|
|
|
retcol="notif_count", |
|
|
|
|
allow_none=True, |
|
|
|
|
) |
|
|
|
|
defer.returnValue(ret) |
|
|
|
|
|
|
|
|
|
if summary_notif_count: |
|
|
|
|
notify_count += summary_notif_count |
|
|
|
|
|
|
|
|
|
# Now get the number of highlights |
|
|
|
|
sql = ( |
|
|
|
|
"SELECT count(*)" |
|
|
|
|
" FROM event_push_actions ea" |
|
|
|
|
" WHERE" |
|
|
|
|
" highlight = 1" |
|
|
|
|
" AND user_id = ?" |
|
|
|
|
" AND room_id = ?" |
|
|
|
|
" AND %s" |
|
|
|
|
) % (lower_bound(token, self.database_engine, inclusive=False),) |
|
|
|
|
|
|
|
|
|
txn.execute(sql, (user_id, room_id)) |
|
|
|
|
row = txn.fetchone() |
|
|
|
|
highlight_count = row[0] if row else 0 |
|
|
|
|
|
|
|
|
|
return { |
|
|
|
|
"notify_count": notify_count, |
|
|
|
|
"highlight_count": highlight_count, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): |
|
|
|
@ -448,7 +474,7 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, |
|
|
|
|
topological_ordering): |
|
|
|
|
topological_ordering, stream_ordering): |
|
|
|
|
""" |
|
|
|
|
Purges old push actions for a user and room before a given |
|
|
|
|
topological_ordering. |
|
|
|
@ -479,11 +505,16 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
txn.execute( |
|
|
|
|
"DELETE FROM event_push_actions " |
|
|
|
|
" WHERE user_id = ? AND room_id = ? AND " |
|
|
|
|
" topological_ordering < ?" |
|
|
|
|
" topological_ordering <= ?" |
|
|
|
|
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", |
|
|
|
|
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
txn.execute(""" |
|
|
|
|
DELETE FROM event_push_summary |
|
|
|
|
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? |
|
|
|
|
""", (room_id, user_id, stream_ordering)) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _find_stream_orderings_for_times(self): |
|
|
|
|
yield self.runInteraction( |
|
|
|
@ -500,6 +531,14 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
"Found stream ordering 1 month ago: it's %d", |
|
|
|
|
self.stream_ordering_month_ago |
|
|
|
|
) |
|
|
|
|
logger.info("Searching for stream ordering 1 day ago") |
|
|
|
|
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn( |
|
|
|
|
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000 |
|
|
|
|
) |
|
|
|
|
logger.info( |
|
|
|
|
"Found stream ordering 1 day ago: it's %d", |
|
|
|
|
self.stream_ordering_day_ago |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def _find_first_stream_ordering_after_ts_txn(self, txn, ts): |
|
|
|
|
""" |
|
|
|
@ -539,6 +578,120 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
|
|
|
|
|
return range_end |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _rotate_notifs(self): |
|
|
|
|
if self._doing_notif_rotation or self.stream_ordering_day_ago is None: |
|
|
|
|
return |
|
|
|
|
self._doing_notif_rotation = True |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
while True: |
|
|
|
|
logger.info("Rotating notifications") |
|
|
|
|
|
|
|
|
|
caught_up = yield self.runInteraction( |
|
|
|
|
"_rotate_notifs", |
|
|
|
|
self._rotate_notifs_txn |
|
|
|
|
) |
|
|
|
|
if caught_up: |
|
|
|
|
break |
|
|
|
|
yield sleep(5) |
|
|
|
|
finally: |
|
|
|
|
self._doing_notif_rotation = False |
|
|
|
|
|
|
|
|
|
def _rotate_notifs_txn(self, txn): |
|
|
|
|
"""Archives older notifications into event_push_summary. Returns whether |
|
|
|
|
the archiving process has caught up or not. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# We want to make sure that we only ever do this one at a time |
|
|
|
|
self.database_engine.lock_table(txn, "event_push_summary") |
|
|
|
|
|
|
|
|
|
# We don't to try and rotate millions of rows at once, so we cap the |
|
|
|
|
# maximum stream ordering we'll rotate before. |
|
|
|
|
txn.execute(""" |
|
|
|
|
SELECT stream_ordering FROM event_push_actions |
|
|
|
|
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 |
|
|
|
|
""") |
|
|
|
|
stream_row = txn.fetchone() |
|
|
|
|
if stream_row: |
|
|
|
|
offset_stream_ordering, = stream_row |
|
|
|
|
rotate_to_stream_ordering = min( |
|
|
|
|
self.stream_ordering_day_ago, offset_stream_ordering |
|
|
|
|
) |
|
|
|
|
caught_up = offset_stream_ordering >= self.stream_ordering_day_ago |
|
|
|
|
else: |
|
|
|
|
rotate_to_stream_ordering = self.stream_ordering_day_ago |
|
|
|
|
caught_up = True |
|
|
|
|
|
|
|
|
|
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) |
|
|
|
|
|
|
|
|
|
# We have caught up iff we were limited by `stream_ordering_day_ago` |
|
|
|
|
return caught_up |
|
|
|
|
|
|
|
|
|
def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering): |
|
|
|
|
old_rotate_stream_ordering = self._simple_select_one_onecol_txn( |
|
|
|
|
txn, |
|
|
|
|
table="event_push_summary_stream_ordering", |
|
|
|
|
keyvalues={}, |
|
|
|
|
retcol="stream_ordering", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Calculate the new counts that should be upserted into event_push_summary |
|
|
|
|
sql = """ |
|
|
|
|
SELECT user_id, room_id, |
|
|
|
|
coalesce(old.notif_count, 0) + upd.notif_count, |
|
|
|
|
upd.stream_ordering, |
|
|
|
|
old.user_id |
|
|
|
|
FROM ( |
|
|
|
|
SELECT user_id, room_id, count(*) as notif_count, |
|
|
|
|
max(stream_ordering) as stream_ordering |
|
|
|
|
FROM event_push_actions |
|
|
|
|
WHERE ? <= stream_ordering AND stream_ordering < ? |
|
|
|
|
AND highlight = 0 |
|
|
|
|
GROUP BY user_id, room_id |
|
|
|
|
) AS upd |
|
|
|
|
LEFT JOIN event_push_summary AS old USING (user_id, room_id) |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,)) |
|
|
|
|
rows = txn.fetchall() |
|
|
|
|
|
|
|
|
|
# If the `old.user_id` above is NULL then we know there isn't already an |
|
|
|
|
# entry in the table, so we simply insert it. Otherwise we update the |
|
|
|
|
# existing table. |
|
|
|
|
self._simple_insert_many_txn( |
|
|
|
|
txn, |
|
|
|
|
table="event_push_summary", |
|
|
|
|
values=[ |
|
|
|
|
{ |
|
|
|
|
"user_id": row[0], |
|
|
|
|
"room_id": row[1], |
|
|
|
|
"notif_count": row[2], |
|
|
|
|
"stream_ordering": row[3], |
|
|
|
|
} |
|
|
|
|
for row in rows if row[4] is None |
|
|
|
|
] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
txn.executemany( |
|
|
|
|
""" |
|
|
|
|
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ? |
|
|
|
|
WHERE user_id = ? AND room_id = ? |
|
|
|
|
""", |
|
|
|
|
((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
txn.execute( |
|
|
|
|
"DELETE FROM event_push_actions" |
|
|
|
|
" WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0", |
|
|
|
|
(old_rotate_stream_ordering, rotate_to_stream_ordering,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
txn.execute( |
|
|
|
|
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", |
|
|
|
|
(rotate_to_stream_ordering,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _action_has_highlight(actions): |
|
|
|
|
for action in actions: |
|
|
|
|