|
|
|
@ -871,23 +871,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
|
|
|
|
|
|
|
|
|
args.append(int(limit)) |
|
|
|
|
|
|
|
|
|
# Using DISTINCT in this SELECT query is quite expensive, because it requires the |
|
|
|
|
# engine to sort on the entire (not limited) result set, i.e. the entire events |
|
|
|
|
# table. We only need to use it when we're filtering on more than two labels, |
|
|
|
|
# because that's the only scenario in which we can possibly to get multiple times |
|
|
|
|
# the same event ID in the results. |
|
|
|
|
select_keywords = "SELECT" |
|
|
|
|
if event_filter and event_filter.labels and len(event_filter.labels) > 1: |
|
|
|
|
select_keywords += "DISTINCT" |
|
|
|
|
join_clause = "" |
|
|
|
|
if event_filter and event_filter.labels: |
|
|
|
|
# If we're not filtering on a label, then joining on event_labels will |
|
|
|
|
# return as many row for a single event as the number of labels it has. To |
|
|
|
|
# avoid this, only join if we're filtering on at least one label. |
|
|
|
|
join_clause = ( |
|
|
|
|
"LEFT JOIN event_labels" |
|
|
|
|
" USING (event_id, room_id, topological_ordering)" |
|
|
|
|
) |
|
|
|
|
if len(event_filter.labels) > 1: |
|
|
|
|
# Using DISTINCT in this SELECT query is quite expensive, because it |
|
|
|
|
# requires the engine to sort on the entire (not limited) result set, |
|
|
|
|
# i.e. the entire events table. We only need to use it when we're |
|
|
|
|
# filtering on more than two labels, because that's the only scenario |
|
|
|
|
# in which we can possibly to get multiple times the same event ID in |
|
|
|
|
# the results. |
|
|
|
|
select_keywords += "DISTINCT" |
|
|
|
|
|
|
|
|
|
sql = ( |
|
|
|
|
"%(select_keywords)s event_id, topological_ordering, stream_ordering" |
|
|
|
|
" FROM events" |
|
|
|
|
" LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)" |
|
|
|
|
" %(join_clause)s" |
|
|
|
|
" WHERE outlier = ? AND room_id = ? AND %(bounds)s" |
|
|
|
|
" ORDER BY topological_ordering %(order)s," |
|
|
|
|
" stream_ordering %(order)s LIMIT ?" |
|
|
|
|
) % {"select_keywords": select_keywords, "bounds": bounds, "order": order} |
|
|
|
|
) % { |
|
|
|
|
"select_keywords": select_keywords, |
|
|
|
|
"join_clause": join_clause, |
|
|
|
|
"bounds": bounds, |
|
|
|
|
"order": order |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
txn.execute(sql, args) |
|
|
|
|
|
|
|
|
|