You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
watcha-synapse/tests/test_utils/oidc.py

348 lines
12 KiB

# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Any, ContextManager, Dict, List, Optional, Tuple
from unittest.mock import Mock, patch
from urllib.parse import parse_qs
import attr
from twisted.web.http_headers import Headers
from twisted.web.iweb import IResponse
from synapse.server import HomeServer
from synapse.util import Clock
from synapse.util.stringutils import random_string
from tests.test_utils import FakeResponse
@attr.s(slots=True, frozen=True, auto_attribs=True)
class FakeAuthorizationGrant:
userinfo: dict
client_id: str
redirect_uri: str
scope: str
nonce: Optional[str]
sid: Optional[str]
class FakeOidcServer:
"""A fake OpenID Connect Provider."""
# All methods here are mocks, so we can track when they are called, and override
# their values
request: Mock
get_jwks_handler: Mock
get_metadata_handler: Mock
get_userinfo_handler: Mock
post_token_handler: Mock
sid_counter: int = 0
def __init__(self, clock: Clock, issuer: str):
from authlib.jose import ECKey, KeySet
self._clock = clock
self.issuer = issuer
self.request = Mock(side_effect=self._request)
self.get_jwks_handler = Mock(side_effect=self._get_jwks_handler)
self.get_metadata_handler = Mock(side_effect=self._get_metadata_handler)
self.get_userinfo_handler = Mock(side_effect=self._get_userinfo_handler)
self.post_token_handler = Mock(side_effect=self._post_token_handler)
# A code -> grant mapping
self._authorization_grants: Dict[str, FakeAuthorizationGrant] = {}
# An access token -> grant mapping
self._sessions: Dict[str, FakeAuthorizationGrant] = {}
# We generate here an ECDSA key with the P-256 curve (ES256 algorithm) used for
# signing JWTs. ECDSA keys are really quick to generate compared to RSA.
self._key = ECKey.generate_key(crv="P-256", is_private=True)
self._jwks = KeySet([ECKey.import_key(self._key.as_pem(is_private=False))])
self._id_token_overrides: Dict[str, Any] = {}
def reset_mocks(self) -> None:
self.request.reset_mock()
self.get_jwks_handler.reset_mock()
self.get_metadata_handler.reset_mock()
self.get_userinfo_handler.reset_mock()
self.post_token_handler.reset_mock()
def patch_homeserver(self, hs: HomeServer) -> ContextManager[Mock]:
"""Patch the ``HomeServer`` HTTP client to handle requests through the ``FakeOidcServer``.
This patch should be used whenever the HS is expected to perform request to the
OIDC provider, e.g.::
fake_oidc_server = self.helper.fake_oidc_server()
with fake_oidc_server.patch_homeserver(hs):
self.make_request("GET", "/_matrix/client/r0/login/sso/redirect")
"""
return patch.object(hs.get_proxied_http_client(), "request", self.request)
@property
def authorization_endpoint(self) -> str:
return self.issuer + "authorize"
@property
def token_endpoint(self) -> str:
return self.issuer + "token"
@property
def userinfo_endpoint(self) -> str:
return self.issuer + "userinfo"
@property
def metadata_endpoint(self) -> str:
return self.issuer + ".well-known/openid-configuration"
@property
def jwks_uri(self) -> str:
return self.issuer + "jwks"
def get_metadata(self) -> dict:
return {
"issuer": self.issuer,
"authorization_endpoint": self.authorization_endpoint,
"token_endpoint": self.token_endpoint,
"jwks_uri": self.jwks_uri,
"userinfo_endpoint": self.userinfo_endpoint,
"response_types_supported": ["code"],
"subject_types_supported": ["public"],
"id_token_signing_alg_values_supported": ["ES256"],
}
def get_jwks(self) -> dict:
return self._jwks.as_dict()
def get_userinfo(self, access_token: str) -> Optional[dict]:
"""Given an access token, get the userinfo of the associated session."""
session = self._sessions.get(access_token, None)
if session is None:
return None
return session.userinfo
def _sign(self, payload: dict) -> str:
from authlib.jose import JsonWebSignature
jws = JsonWebSignature()
kid = self.get_jwks()["keys"][0]["kid"]
protected = {"alg": "ES256", "kid": kid}
json_payload = json.dumps(payload)
return jws.serialize_compact(protected, json_payload, self._key).decode("utf-8")
def generate_id_token(self, grant: FakeAuthorizationGrant) -> str:
now = int(self._clock.time())
id_token = {
**grant.userinfo,
"iss": self.issuer,
"aud": grant.client_id,
"iat": now,
"nbf": now,
"exp": now + 600,
}
if grant.nonce is not None:
id_token["nonce"] = grant.nonce
if grant.sid is not None:
id_token["sid"] = grant.sid
id_token.update(self._id_token_overrides)
return self._sign(id_token)
def generate_logout_token(self, grant: FakeAuthorizationGrant) -> str:
now = int(self._clock.time())
logout_token = {
"iss": self.issuer,
"aud": grant.client_id,
"iat": now,
"jti": random_string(10),
"events": {
"http://schemas.openid.net/event/backchannel-logout": {},
},
}
if grant.sid is not None:
logout_token["sid"] = grant.sid
if "sub" in grant.userinfo:
logout_token["sub"] = grant.userinfo["sub"]
return self._sign(logout_token)
def id_token_override(self, overrides: dict) -> ContextManager[dict]:
"""Temporarily patch the ID token generated by the token endpoint."""
return patch.object(self, "_id_token_overrides", overrides)
def start_authorization(
self,
client_id: str,
scope: str,
redirect_uri: str,
userinfo: dict,
nonce: Optional[str] = None,
with_sid: bool = False,
) -> Tuple[str, FakeAuthorizationGrant]:
"""Start an authorization request, and get back the code to use on the authorization endpoint."""
code = random_string(10)
sid = None
if with_sid:
sid = str(self.sid_counter)
self.sid_counter += 1
grant = FakeAuthorizationGrant(
userinfo=userinfo,
scope=scope,
redirect_uri=redirect_uri,
nonce=nonce,
client_id=client_id,
sid=sid,
)
self._authorization_grants[code] = grant
return code, grant
def exchange_code(self, code: str) -> Optional[Dict[str, Any]]:
grant = self._authorization_grants.pop(code, None)
if grant is None:
return None
access_token = random_string(10)
self._sessions[access_token] = grant
token = {
"token_type": "Bearer",
"access_token": access_token,
"expires_in": 3600,
"scope": grant.scope,
}
if "openid" in grant.scope:
token["id_token"] = self.generate_id_token(grant)
return dict(token)
def buggy_endpoint(
self,
*,
jwks: bool = False,
metadata: bool = False,
token: bool = False,
userinfo: bool = False,
) -> ContextManager[Dict[str, Mock]]:
"""A context which makes a set of endpoints return a 500 error.
Args:
jwks: If True, makes the JWKS endpoint return a 500 error.
metadata: If True, makes the OIDC Discovery endpoint return a 500 error.
token: If True, makes the token endpoint return a 500 error.
userinfo: If True, makes the userinfo endpoint return a 500 error.
"""
buggy = FakeResponse(code=500, body=b"Internal server error")
patches = {}
if jwks:
patches["get_jwks_handler"] = Mock(return_value=buggy)
if metadata:
patches["get_metadata_handler"] = Mock(return_value=buggy)
if token:
patches["post_token_handler"] = Mock(return_value=buggy)
if userinfo:
patches["get_userinfo_handler"] = Mock(return_value=buggy)
return patch.multiple(self, **patches)
async def _request(
self,
method: str,
uri: str,
data: Optional[bytes] = None,
headers: Optional[Headers] = None,
) -> IResponse:
"""The override of the SimpleHttpClient#request() method"""
access_token: Optional[str] = None
if headers is None:
headers = Headers()
# Try to find the access token in the headers if any
auth_headers = headers.getRawHeaders(b"Authorization")
if auth_headers:
parts = auth_headers[0].split(b" ")
if parts[0] == b"Bearer" and len(parts) == 2:
access_token = parts[1].decode("ascii")
if method == "POST":
# If the method is POST, assume it has an url-encoded body
if data is None or headers.getRawHeaders(b"Content-Type") != [
b"application/x-www-form-urlencoded"
]:
return FakeResponse.json(code=400, payload={"error": "invalid_request"})
params = parse_qs(data.decode("utf-8"))
if uri == self.token_endpoint:
# Even though this endpoint should be protected, this does not check
# for client authentication. We're not checking it for simplicity,
# and because client authentication is tested in other standalone tests.
return self.post_token_handler(params)
elif method == "GET":
if uri == self.jwks_uri:
return self.get_jwks_handler()
elif uri == self.metadata_endpoint:
return self.get_metadata_handler()
elif uri == self.userinfo_endpoint:
return self.get_userinfo_handler(access_token=access_token)
return FakeResponse(code=404, body=b"404 not found")
# Request handlers
def _get_jwks_handler(self) -> IResponse:
"""Handles requests to the JWKS URI."""
return FakeResponse.json(payload=self.get_jwks())
def _get_metadata_handler(self) -> IResponse:
"""Handles requests to the OIDC well-known document."""
return FakeResponse.json(payload=self.get_metadata())
def _get_userinfo_handler(self, access_token: Optional[str]) -> IResponse:
"""Handles requests to the userinfo endpoint."""
if access_token is None:
return FakeResponse(code=401)
user_info = self.get_userinfo(access_token)
if user_info is None:
return FakeResponse(code=401)
return FakeResponse.json(payload=user_info)
def _post_token_handler(self, params: Dict[str, List[str]]) -> IResponse:
"""Handles requests to the token endpoint."""
code = params.get("code", [])
if len(code) != 1:
return FakeResponse.json(code=400, payload={"error": "invalid_request"})
grant = self.exchange_code(code=code[0])
if grant is None:
return FakeResponse.json(code=400, payload={"error": "invalid_grant"})
return FakeResponse.json(payload=grant)