|
|
|
@ -32,7 +32,7 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore |
|
|
|
|
from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException |
|
|
|
|
from synapse.storage.util.id_generators import ChainedIdGenerator |
|
|
|
|
from synapse.util import json_encoder |
|
|
|
|
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList |
|
|
|
|
from synapse.util.caches.descriptors import cached, cachedList |
|
|
|
|
from synapse.util.caches.stream_change_cache import StreamChangeCache |
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
@ -115,9 +115,9 @@ class PushRulesWorkerStore( |
|
|
|
|
""" |
|
|
|
|
raise NotImplementedError() |
|
|
|
|
|
|
|
|
|
@cachedInlineCallbacks(max_entries=5000) |
|
|
|
|
def get_push_rules_for_user(self, user_id): |
|
|
|
|
rows = yield self.db_pool.simple_select_list( |
|
|
|
|
@cached(max_entries=5000) |
|
|
|
|
async def get_push_rules_for_user(self, user_id): |
|
|
|
|
rows = await self.db_pool.simple_select_list( |
|
|
|
|
table="push_rules", |
|
|
|
|
keyvalues={"user_name": user_id}, |
|
|
|
|
retcols=( |
|
|
|
@ -133,17 +133,15 @@ class PushRulesWorkerStore( |
|
|
|
|
|
|
|
|
|
rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) |
|
|
|
|
|
|
|
|
|
enabled_map = yield self.get_push_rules_enabled_for_user(user_id) |
|
|
|
|
enabled_map = await self.get_push_rules_enabled_for_user(user_id) |
|
|
|
|
|
|
|
|
|
use_new_defaults = user_id in self._users_new_default_push_rules |
|
|
|
|
|
|
|
|
|
rules = _load_rules(rows, enabled_map, use_new_defaults) |
|
|
|
|
return _load_rules(rows, enabled_map, use_new_defaults) |
|
|
|
|
|
|
|
|
|
return rules |
|
|
|
|
|
|
|
|
|
@cachedInlineCallbacks(max_entries=5000) |
|
|
|
|
def get_push_rules_enabled_for_user(self, user_id): |
|
|
|
|
results = yield self.db_pool.simple_select_list( |
|
|
|
|
@cached(max_entries=5000) |
|
|
|
|
async def get_push_rules_enabled_for_user(self, user_id): |
|
|
|
|
results = await self.db_pool.simple_select_list( |
|
|
|
|
table="push_rules_enable", |
|
|
|
|
keyvalues={"user_name": user_id}, |
|
|
|
|
retcols=("user_name", "rule_id", "enabled"), |
|
|
|
@ -202,14 +200,15 @@ class PushRulesWorkerStore( |
|
|
|
|
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): |
|
|
|
|
async def copy_push_rule_from_room_to_room( |
|
|
|
|
self, new_room_id: str, user_id: str, rule: dict |
|
|
|
|
) -> None: |
|
|
|
|
"""Copy a single push rule from one room to another for a specific user. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
new_room_id (str): ID of the new room. |
|
|
|
|
user_id (str): ID of user the push rule belongs to. |
|
|
|
|
rule (Dict): A push rule. |
|
|
|
|
new_room_id: ID of the new room. |
|
|
|
|
user_id : ID of user the push rule belongs to. |
|
|
|
|
rule: A push rule. |
|
|
|
|
""" |
|
|
|
|
# Create new rule id |
|
|
|
|
rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) |
|
|
|
@ -221,7 +220,7 @@ class PushRulesWorkerStore( |
|
|
|
|
condition["pattern"] = new_room_id |
|
|
|
|
|
|
|
|
|
# Add the rule for the new room |
|
|
|
|
yield self.add_push_rule( |
|
|
|
|
await self.add_push_rule( |
|
|
|
|
user_id=user_id, |
|
|
|
|
rule_id=new_rule_id, |
|
|
|
|
priority_class=rule["priority_class"], |
|
|
|
@ -229,20 +228,19 @@ class PushRulesWorkerStore( |
|
|
|
|
actions=rule["actions"], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def copy_push_rules_from_room_to_room_for_user( |
|
|
|
|
self, old_room_id, new_room_id, user_id |
|
|
|
|
): |
|
|
|
|
async def copy_push_rules_from_room_to_room_for_user( |
|
|
|
|
self, old_room_id: str, new_room_id: str, user_id: str |
|
|
|
|
) -> None: |
|
|
|
|
"""Copy all of the push rules from one room to another for a specific |
|
|
|
|
user. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
old_room_id (str): ID of the old room. |
|
|
|
|
new_room_id (str): ID of the new room. |
|
|
|
|
user_id (str): ID of user to copy push rules for. |
|
|
|
|
old_room_id: ID of the old room. |
|
|
|
|
new_room_id: ID of the new room. |
|
|
|
|
user_id: ID of user to copy push rules for. |
|
|
|
|
""" |
|
|
|
|
# Retrieve push rules for this user |
|
|
|
|
user_push_rules = yield self.get_push_rules_for_user(user_id) |
|
|
|
|
user_push_rules = await self.get_push_rules_for_user(user_id) |
|
|
|
|
|
|
|
|
|
# Get rules relating to the old room and copy them to the new room |
|
|
|
|
for rule in user_push_rules: |
|
|
|
@ -251,7 +249,7 @@ class PushRulesWorkerStore( |
|
|
|
|
(c.get("key") == "room_id" and c.get("pattern") == old_room_id) |
|
|
|
|
for c in conditions |
|
|
|
|
): |
|
|
|
|
yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) |
|
|
|
|
await self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) |
|
|
|
|
|
|
|
|
|
@cachedList( |
|
|
|
|
cached_method_name="get_push_rules_enabled_for_user", |
|
|
|
@ -328,8 +326,7 @@ class PushRulesWorkerStore( |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PushRuleStore(PushRulesWorkerStore): |
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def add_push_rule( |
|
|
|
|
async def add_push_rule( |
|
|
|
|
self, |
|
|
|
|
user_id, |
|
|
|
|
rule_id, |
|
|
|
@ -338,13 +335,13 @@ class PushRuleStore(PushRulesWorkerStore): |
|
|
|
|
actions, |
|
|
|
|
before=None, |
|
|
|
|
after=None, |
|
|
|
|
): |
|
|
|
|
) -> None: |
|
|
|
|
conditions_json = json_encoder.encode(conditions) |
|
|
|
|
actions_json = json_encoder.encode(actions) |
|
|
|
|
with self._push_rules_stream_id_gen.get_next() as ids: |
|
|
|
|
stream_id, event_stream_ordering = ids |
|
|
|
|
if before or after: |
|
|
|
|
yield self.db_pool.runInteraction( |
|
|
|
|
await self.db_pool.runInteraction( |
|
|
|
|
"_add_push_rule_relative_txn", |
|
|
|
|
self._add_push_rule_relative_txn, |
|
|
|
|
stream_id, |
|
|
|
@ -358,7 +355,7 @@ class PushRuleStore(PushRulesWorkerStore): |
|
|
|
|
after, |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
yield self.db_pool.runInteraction( |
|
|
|
|
await self.db_pool.runInteraction( |
|
|
|
|
"_add_push_rule_highest_priority_txn", |
|
|
|
|
self._add_push_rule_highest_priority_txn, |
|
|
|
|
stream_id, |
|
|
|
@ -542,16 +539,15 @@ class PushRuleStore(PushRulesWorkerStore): |
|
|
|
|
}, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def delete_push_rule(self, user_id, rule_id): |
|
|
|
|
async def delete_push_rule(self, user_id: str, rule_id: str) -> None: |
|
|
|
|
""" |
|
|
|
|
Delete a push rule. Args specify the row to be deleted and can be |
|
|
|
|
any of the columns in the push_rule table, but below are the |
|
|
|
|
standard ones |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
user_id (str): The matrix ID of the push rule owner |
|
|
|
|
rule_id (str): The rule_id of the rule to be deleted |
|
|
|
|
user_id: The matrix ID of the push rule owner |
|
|
|
|
rule_id: The rule_id of the rule to be deleted |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
def delete_push_rule_txn(txn, stream_id, event_stream_ordering): |
|
|
|
@ -565,18 +561,17 @@ class PushRuleStore(PushRulesWorkerStore): |
|
|
|
|
|
|
|
|
|
with self._push_rules_stream_id_gen.get_next() as ids: |
|
|
|
|
stream_id, event_stream_ordering = ids |
|
|
|
|
yield self.db_pool.runInteraction( |
|
|
|
|
await self.db_pool.runInteraction( |
|
|
|
|
"delete_push_rule", |
|
|
|
|
delete_push_rule_txn, |
|
|
|
|
stream_id, |
|
|
|
|
event_stream_ordering, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def set_push_rule_enabled(self, user_id, rule_id, enabled): |
|
|
|
|
async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None: |
|
|
|
|
with self._push_rules_stream_id_gen.get_next() as ids: |
|
|
|
|
stream_id, event_stream_ordering = ids |
|
|
|
|
yield self.db_pool.runInteraction( |
|
|
|
|
await self.db_pool.runInteraction( |
|
|
|
|
"_set_push_rule_enabled_txn", |
|
|
|
|
self._set_push_rule_enabled_txn, |
|
|
|
|
stream_id, |
|
|
|
@ -607,8 +602,9 @@ class PushRuleStore(PushRulesWorkerStore): |
|
|
|
|
op="ENABLE" if enabled else "DISABLE", |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): |
|
|
|
|
async def set_push_rule_actions( |
|
|
|
|
self, user_id, rule_id, actions, is_default_rule |
|
|
|
|
) -> None: |
|
|
|
|
actions_json = json_encoder.encode(actions) |
|
|
|
|
|
|
|
|
|
def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): |
|
|
|
@ -649,7 +645,7 @@ class PushRuleStore(PushRulesWorkerStore): |
|
|
|
|
|
|
|
|
|
with self._push_rules_stream_id_gen.get_next() as ids: |
|
|
|
|
stream_id, event_stream_ordering = ids |
|
|
|
|
yield self.db_pool.runInteraction( |
|
|
|
|
await self.db_pool.runInteraction( |
|
|
|
|
"set_push_rule_actions", |
|
|
|
|
set_push_rule_actions_txn, |
|
|
|
|
stream_id, |
|
|
|
|