|
|
|
@ -11,7 +11,7 @@ |
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
|
|
|
# See the License for the specific language governing permissions and |
|
|
|
|
# limitations under the License. |
|
|
|
|
from typing import Any, List, Mapping, Sequence, Union |
|
|
|
|
from typing import Any, List, Mapping, Optional, Sequence, Union |
|
|
|
|
from unittest.mock import Mock |
|
|
|
|
|
|
|
|
|
from twisted.test.proto_helpers import MemoryReactor |
|
|
|
@ -22,6 +22,7 @@ from synapse.types import JsonDict |
|
|
|
|
from synapse.util import Clock |
|
|
|
|
|
|
|
|
|
from tests import unittest |
|
|
|
|
from tests.unittest import override_config |
|
|
|
|
|
|
|
|
|
PROTOCOL = "myproto" |
|
|
|
|
TOKEN = "myastoken" |
|
|
|
@ -39,7 +40,7 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): |
|
|
|
|
hs_token=TOKEN, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def test_query_3pe_authenticates_token(self) -> None: |
|
|
|
|
def test_query_3pe_authenticates_token_via_header(self) -> None: |
|
|
|
|
""" |
|
|
|
|
Tests that 3pe queries to the appservice are authenticated |
|
|
|
|
with the appservice's token. |
|
|
|
@ -74,12 +75,88 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): |
|
|
|
|
args: Mapping[Any, Any], |
|
|
|
|
headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]], |
|
|
|
|
) -> List[JsonDict]: |
|
|
|
|
# Ensure the access token is passed as both a header and query arg. |
|
|
|
|
if not headers.get("Authorization") or not args.get(b"access_token"): |
|
|
|
|
# Ensure the access token is passed as a header. |
|
|
|
|
if not headers or not headers.get("Authorization"): |
|
|
|
|
raise RuntimeError("Access token not provided") |
|
|
|
|
# ... and not as a query param |
|
|
|
|
if b"access_token" in args: |
|
|
|
|
raise RuntimeError( |
|
|
|
|
"Access token should not be passed as a query param." |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) |
|
|
|
|
self.request_url = url |
|
|
|
|
if url == URL_USER: |
|
|
|
|
return SUCCESS_RESULT_USER |
|
|
|
|
elif url == URL_LOCATION: |
|
|
|
|
return SUCCESS_RESULT_LOCATION |
|
|
|
|
else: |
|
|
|
|
raise RuntimeError( |
|
|
|
|
"URL provided was invalid. This should never be seen." |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# We assign to a method, which mypy doesn't like. |
|
|
|
|
self.api.get_json = Mock(side_effect=get_json) # type: ignore[assignment] |
|
|
|
|
|
|
|
|
|
result = self.get_success( |
|
|
|
|
self.api.query_3pe(self.service, "user", PROTOCOL, {b"some": [b"field"]}) |
|
|
|
|
) |
|
|
|
|
self.assertEqual(self.request_url, URL_USER) |
|
|
|
|
self.assertEqual(result, SUCCESS_RESULT_USER) |
|
|
|
|
result = self.get_success( |
|
|
|
|
self.api.query_3pe( |
|
|
|
|
self.service, "location", PROTOCOL, {b"some": [b"field"]} |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
self.assertEqual(self.request_url, URL_LOCATION) |
|
|
|
|
self.assertEqual(result, SUCCESS_RESULT_LOCATION) |
|
|
|
|
|
|
|
|
|
@override_config({"use_appservice_legacy_authorization": True}) |
|
|
|
|
def test_query_3pe_authenticates_token_via_param(self) -> None: |
|
|
|
|
""" |
|
|
|
|
Tests that 3pe queries to the appservice are authenticated |
|
|
|
|
with the appservice's token. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
SUCCESS_RESULT_USER = [ |
|
|
|
|
{ |
|
|
|
|
"protocol": PROTOCOL, |
|
|
|
|
"userid": "@a:user", |
|
|
|
|
"fields": { |
|
|
|
|
"more": "fields", |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
] |
|
|
|
|
SUCCESS_RESULT_LOCATION = [ |
|
|
|
|
{ |
|
|
|
|
"protocol": PROTOCOL, |
|
|
|
|
"alias": "#a:room", |
|
|
|
|
"fields": { |
|
|
|
|
"more": "fields", |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
URL_USER = f"{URL}/_matrix/app/v1/thirdparty/user/{PROTOCOL}" |
|
|
|
|
URL_LOCATION = f"{URL}/_matrix/app/v1/thirdparty/location/{PROTOCOL}" |
|
|
|
|
|
|
|
|
|
self.request_url = None |
|
|
|
|
|
|
|
|
|
async def get_json( |
|
|
|
|
url: str, |
|
|
|
|
args: Mapping[Any, Any], |
|
|
|
|
headers: Optional[ |
|
|
|
|
Mapping[Union[str, bytes], Sequence[Union[str, bytes]]] |
|
|
|
|
] = None, |
|
|
|
|
) -> List[JsonDict]: |
|
|
|
|
# Ensure the access token is passed as a both a query param and in the headers. |
|
|
|
|
if not args.get(b"access_token"): |
|
|
|
|
raise RuntimeError("Access token should be provided in query params.") |
|
|
|
|
if not headers or not headers.get("Authorization"): |
|
|
|
|
raise RuntimeError("Access token should be provided in auth headers.") |
|
|
|
|
|
|
|
|
|
self.assertEqual(args.get(b"access_token"), TOKEN) |
|
|
|
|
self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) |
|
|
|
|
self.request_url = url |
|
|
|
|
if url == URL_USER: |
|
|
|
|
return SUCCESS_RESULT_USER |
|
|
|
|