|
|
|
@ -22,6 +22,7 @@ from typing import ( |
|
|
|
|
Iterable, |
|
|
|
|
List, |
|
|
|
|
Mapping, |
|
|
|
|
MutableMapping, |
|
|
|
|
Optional, |
|
|
|
|
Union, |
|
|
|
|
) |
|
|
|
@ -580,10 +581,20 @@ class EventClientSerializer: |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def copy_power_levels_contents( |
|
|
|
|
old_power_levels: Mapping[str, Union[int, Mapping[str, int]]] |
|
|
|
|
_PowerLevel = Union[str, int] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def copy_and_fixup_power_levels_contents( |
|
|
|
|
old_power_levels: Mapping[str, Union[_PowerLevel, Mapping[str, _PowerLevel]]] |
|
|
|
|
) -> Dict[str, Union[int, Dict[str, int]]]: |
|
|
|
|
"""Copy the content of a power_levels event, unfreezing frozendicts along the way |
|
|
|
|
"""Copy the content of a power_levels event, unfreezing frozendicts along the way. |
|
|
|
|
|
|
|
|
|
We accept as input power level values which are strings, provided they represent an |
|
|
|
|
integer, e.g. `"`100"` instead of 100. Such strings are converted to integers |
|
|
|
|
in the returned dictionary (hence "fixup" in the function name). |
|
|
|
|
|
|
|
|
|
Note that future room versions will outlaw such stringy power levels (see |
|
|
|
|
https://github.com/matrix-org/matrix-spec/issues/853). |
|
|
|
|
|
|
|
|
|
Raises: |
|
|
|
|
TypeError if the input does not look like a valid power levels event content |
|
|
|
@ -592,29 +603,47 @@ def copy_power_levels_contents( |
|
|
|
|
raise TypeError("Not a valid power-levels content: %r" % (old_power_levels,)) |
|
|
|
|
|
|
|
|
|
power_levels: Dict[str, Union[int, Dict[str, int]]] = {} |
|
|
|
|
for k, v in old_power_levels.items(): |
|
|
|
|
|
|
|
|
|
if isinstance(v, int): |
|
|
|
|
power_levels[k] = v |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
for k, v in old_power_levels.items(): |
|
|
|
|
if isinstance(v, collections.abc.Mapping): |
|
|
|
|
h: Dict[str, int] = {} |
|
|
|
|
power_levels[k] = h |
|
|
|
|
for k1, v1 in v.items(): |
|
|
|
|
# we should only have one level of nesting |
|
|
|
|
if not isinstance(v1, int): |
|
|
|
|
raise TypeError( |
|
|
|
|
"Invalid power_levels value for %s.%s: %r" % (k, k1, v1) |
|
|
|
|
) |
|
|
|
|
h[k1] = v1 |
|
|
|
|
continue |
|
|
|
|
_copy_power_level_value_as_integer(v1, h, k1) |
|
|
|
|
|
|
|
|
|
raise TypeError("Invalid power_levels value for %s: %r" % (k, v)) |
|
|
|
|
else: |
|
|
|
|
_copy_power_level_value_as_integer(v, power_levels, k) |
|
|
|
|
|
|
|
|
|
return power_levels |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _copy_power_level_value_as_integer( |
|
|
|
|
old_value: object, |
|
|
|
|
power_levels: MutableMapping[str, Any], |
|
|
|
|
key: str, |
|
|
|
|
) -> None: |
|
|
|
|
"""Set `power_levels[key]` to the integer represented by `old_value`. |
|
|
|
|
|
|
|
|
|
:raises TypeError: if `old_value` is not an integer, nor a base-10 string |
|
|
|
|
representation of an integer. |
|
|
|
|
""" |
|
|
|
|
if isinstance(old_value, int): |
|
|
|
|
power_levels[key] = old_value |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
if isinstance(old_value, str): |
|
|
|
|
try: |
|
|
|
|
parsed_value = int(old_value, base=10) |
|
|
|
|
except ValueError: |
|
|
|
|
# Fall through to the final TypeError. |
|
|
|
|
pass |
|
|
|
|
else: |
|
|
|
|
power_levels[key] = parsed_value |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
raise TypeError(f"Invalid power_levels value for {key}: {old_value}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_canonicaljson(value: Any) -> None: |
|
|
|
|
""" |
|
|
|
|
Ensure that the JSON object is valid according to the rules of canonical JSON. |
|
|
|
|