|
|
|
@ -168,19 +168,13 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
row = txn.fetchone() |
|
|
|
|
notify_count = row[0] if row else 0 |
|
|
|
|
|
|
|
|
|
summary_notif_count = self._simple_select_one_onecol_txn( |
|
|
|
|
txn, |
|
|
|
|
table="event_push_summary", |
|
|
|
|
keyvalues={ |
|
|
|
|
"user_id": user_id, |
|
|
|
|
"room_id": room_id, |
|
|
|
|
}, |
|
|
|
|
retcol="notif_count", |
|
|
|
|
allow_none=True, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if summary_notif_count: |
|
|
|
|
notify_count += summary_notif_count |
|
|
|
|
txn.execute(""" |
|
|
|
|
SELECT notif_count FROM event_push_summary |
|
|
|
|
WHERE room_id = ? AND user_id = ? AND stream_ordering > ? |
|
|
|
|
""", (room_id, user_id, stream_ordering,)) |
|
|
|
|
rows = txn.fetchall() |
|
|
|
|
if rows: |
|
|
|
|
notify_count += rows[0][0] |
|
|
|
|
|
|
|
|
|
# Now get the number of highlights |
|
|
|
|
sql = ( |
|
|
|
@ -645,12 +639,20 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
# We want to make sure that we only ever do this one at a time |
|
|
|
|
self.database_engine.lock_table(txn, "event_push_summary") |
|
|
|
|
|
|
|
|
|
old_rotate_stream_ordering = self._simple_select_one_onecol_txn( |
|
|
|
|
txn, |
|
|
|
|
table="event_push_summary_stream_ordering", |
|
|
|
|
keyvalues={}, |
|
|
|
|
retcol="stream_ordering", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# We don't to try and rotate millions of rows at once, so we cap the |
|
|
|
|
# maximum stream ordering we'll rotate before. |
|
|
|
|
txn.execute(""" |
|
|
|
|
SELECT stream_ordering FROM event_push_actions |
|
|
|
|
WHERE stream_ordering > ? |
|
|
|
|
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 |
|
|
|
|
""") |
|
|
|
|
""", (old_rotate_stream_ordering,)) |
|
|
|
|
stream_row = txn.fetchone() |
|
|
|
|
if stream_row: |
|
|
|
|
offset_stream_ordering, = stream_row |
|
|
|
@ -662,6 +664,8 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
rotate_to_stream_ordering = self.stream_ordering_day_ago |
|
|
|
|
caught_up = True |
|
|
|
|
|
|
|
|
|
logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering) |
|
|
|
|
|
|
|
|
|
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) |
|
|
|
|
|
|
|
|
|
# We have caught up iff we were limited by `stream_ordering_day_ago` |
|
|
|
@ -695,6 +699,8 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,)) |
|
|
|
|
rows = txn.fetchall() |
|
|
|
|
|
|
|
|
|
logger.info("Rotating notifications, handling %d rows", len(rows)) |
|
|
|
|
|
|
|
|
|
# If the `old.user_id` above is NULL then we know there isn't already an |
|
|
|
|
# entry in the table, so we simply insert it. Otherwise we update the |
|
|
|
|
# existing table. |
|
|
|
@ -726,6 +732,8 @@ class EventPushActionsStore(SQLBaseStore): |
|
|
|
|
(old_rotate_stream_ordering, rotate_to_stream_ordering,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) |
|
|
|
|
|
|
|
|
|
txn.execute( |
|
|
|
|
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", |
|
|
|
|
(rotate_to_stream_ordering,) |
|
|
|
|