|
|
|
@ -15,7 +15,7 @@ |
|
|
|
|
|
|
|
|
|
import time |
|
|
|
|
import urllib.parse |
|
|
|
|
from typing import Any, Dict, List, Union |
|
|
|
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
|
from urllib.parse import urlencode |
|
|
|
|
|
|
|
|
|
from mock import Mock |
|
|
|
@ -47,8 +47,14 @@ except ImportError: |
|
|
|
|
HAS_JWT = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# public_base_url used in some tests |
|
|
|
|
BASE_URL = "https://synapse/" |
|
|
|
|
# synapse server name: used to populate public_baseurl in some tests |
|
|
|
|
SYNAPSE_SERVER_PUBLIC_HOSTNAME = "synapse" |
|
|
|
|
|
|
|
|
|
# public_baseurl for some tests. It uses an http:// scheme because |
|
|
|
|
# FakeChannel.isSecure() returns False, so synapse will see the requested uri as |
|
|
|
|
# http://..., so using http in the public_baseurl stops Synapse trying to redirect to |
|
|
|
|
# https://.... |
|
|
|
|
BASE_URL = "http://%s/" % (SYNAPSE_SERVER_PUBLIC_HOSTNAME,) |
|
|
|
|
|
|
|
|
|
# CAS server used in some tests |
|
|
|
|
CAS_SERVER = "https://fake.test" |
|
|
|
@ -480,11 +486,7 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): |
|
|
|
|
def test_multi_sso_redirect(self): |
|
|
|
|
"""/login/sso/redirect should redirect to an identity picker""" |
|
|
|
|
# first hit the redirect url, which should redirect to our idp picker |
|
|
|
|
channel = self.make_request( |
|
|
|
|
"GET", |
|
|
|
|
"/_matrix/client/r0/login/sso/redirect?redirectUrl=" |
|
|
|
|
+ urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL), |
|
|
|
|
) |
|
|
|
|
channel = self._make_sso_redirect_request(False, None) |
|
|
|
|
self.assertEqual(channel.code, 302, channel.result) |
|
|
|
|
uri = channel.headers.getRawHeaders("Location")[0] |
|
|
|
|
|
|
|
|
@ -628,34 +630,21 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): |
|
|
|
|
|
|
|
|
|
def test_client_idp_redirect_msc2858_disabled(self): |
|
|
|
|
"""If the client tries to pick an IdP but MSC2858 is disabled, return a 400""" |
|
|
|
|
channel = self.make_request( |
|
|
|
|
"GET", |
|
|
|
|
"/_matrix/client/unstable/org.matrix.msc2858/login/sso/redirect/oidc?redirectUrl=" |
|
|
|
|
+ urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL), |
|
|
|
|
) |
|
|
|
|
channel = self._make_sso_redirect_request(True, "oidc") |
|
|
|
|
self.assertEqual(channel.code, 400, channel.result) |
|
|
|
|
self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") |
|
|
|
|
|
|
|
|
|
@override_config({"experimental_features": {"msc2858_enabled": True}}) |
|
|
|
|
def test_client_idp_redirect_to_unknown(self): |
|
|
|
|
"""If the client tries to pick an unknown IdP, return a 404""" |
|
|
|
|
channel = self.make_request( |
|
|
|
|
"GET", |
|
|
|
|
"/_matrix/client/unstable/org.matrix.msc2858/login/sso/redirect/xxx?redirectUrl=" |
|
|
|
|
+ urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL), |
|
|
|
|
) |
|
|
|
|
channel = self._make_sso_redirect_request(True, "xxx") |
|
|
|
|
self.assertEqual(channel.code, 404, channel.result) |
|
|
|
|
self.assertEqual(channel.json_body["errcode"], "M_NOT_FOUND") |
|
|
|
|
|
|
|
|
|
@override_config({"experimental_features": {"msc2858_enabled": True}}) |
|
|
|
|
def test_client_idp_redirect_to_oidc(self): |
|
|
|
|
"""If the client pick a known IdP, redirect to it""" |
|
|
|
|
channel = self.make_request( |
|
|
|
|
"GET", |
|
|
|
|
"/_matrix/client/unstable/org.matrix.msc2858/login/sso/redirect/oidc?redirectUrl=" |
|
|
|
|
+ urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
channel = self._make_sso_redirect_request(True, "oidc") |
|
|
|
|
self.assertEqual(channel.code, 302, channel.result) |
|
|
|
|
oidc_uri = channel.headers.getRawHeaders("Location")[0] |
|
|
|
|
oidc_uri_path, oidc_uri_query = oidc_uri.split("?", 1) |
|
|
|
@ -663,6 +652,30 @@ class MultiSSOTestCase(unittest.HomeserverTestCase): |
|
|
|
|
# it should redirect us to the auth page of the OIDC server |
|
|
|
|
self.assertEqual(oidc_uri_path, TEST_OIDC_AUTH_ENDPOINT) |
|
|
|
|
|
|
|
|
|
def _make_sso_redirect_request( |
|
|
|
|
self, unstable_endpoint: bool = False, idp_prov: Optional[str] = None |
|
|
|
|
): |
|
|
|
|
"""Send a request to /_matrix/client/r0/login/sso/redirect |
|
|
|
|
|
|
|
|
|
... or the unstable equivalent |
|
|
|
|
|
|
|
|
|
... possibly specifying an IDP provider |
|
|
|
|
""" |
|
|
|
|
endpoint = ( |
|
|
|
|
"/_matrix/client/unstable/org.matrix.msc2858/login/sso/redirect" |
|
|
|
|
if unstable_endpoint |
|
|
|
|
else "/_matrix/client/r0/login/sso/redirect" |
|
|
|
|
) |
|
|
|
|
if idp_prov is not None: |
|
|
|
|
endpoint += "/" + idp_prov |
|
|
|
|
endpoint += "?redirectUrl=" + urllib.parse.quote_plus(TEST_CLIENT_REDIRECT_URL) |
|
|
|
|
|
|
|
|
|
return self.make_request( |
|
|
|
|
"GET", |
|
|
|
|
endpoint, |
|
|
|
|
custom_headers=[("Host", SYNAPSE_SERVER_PUBLIC_HOSTNAME)], |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _get_value_from_macaroon(macaroon: pymacaroons.Macaroon, key: str) -> str: |
|
|
|
|
prefix = key + " = " |
|
|
|
|