|
|
|
@ -24,6 +24,7 @@ import pkg_resources |
|
|
|
|
|
|
|
|
|
import synapse.rest.admin |
|
|
|
|
from synapse.api.constants import LoginType, Membership |
|
|
|
|
from synapse.api.errors import Codes |
|
|
|
|
from synapse.rest.client.v1 import login, room |
|
|
|
|
from synapse.rest.client.v2_alpha import account, register |
|
|
|
|
|
|
|
|
@ -325,3 +326,304 @@ class DeactivateTestCase(unittest.HomeserverTestCase): |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEqual(request.code, 200) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ThreepidEmailRestTestCase(unittest.HomeserverTestCase): |
|
|
|
|
|
|
|
|
|
servlets = [ |
|
|
|
|
account.register_servlets, |
|
|
|
|
login.register_servlets, |
|
|
|
|
synapse.rest.admin.register_servlets_for_client_rest_resource, |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
def make_homeserver(self, reactor, clock): |
|
|
|
|
config = self.default_config() |
|
|
|
|
|
|
|
|
|
# Email config. |
|
|
|
|
self.email_attempts = [] |
|
|
|
|
|
|
|
|
|
def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs): |
|
|
|
|
self.email_attempts.append(msg) |
|
|
|
|
|
|
|
|
|
config["email"] = { |
|
|
|
|
"enable_notifs": False, |
|
|
|
|
"template_dir": os.path.abspath( |
|
|
|
|
pkg_resources.resource_filename("synapse", "res/templates") |
|
|
|
|
), |
|
|
|
|
"smtp_host": "127.0.0.1", |
|
|
|
|
"smtp_port": 20, |
|
|
|
|
"require_transport_security": False, |
|
|
|
|
"smtp_user": None, |
|
|
|
|
"smtp_pass": None, |
|
|
|
|
"notif_from": "test@example.com", |
|
|
|
|
} |
|
|
|
|
config["public_baseurl"] = "https://example.com" |
|
|
|
|
|
|
|
|
|
self.hs = self.setup_test_homeserver(config=config, sendmail=sendmail) |
|
|
|
|
return self.hs |
|
|
|
|
|
|
|
|
|
def prepare(self, reactor, clock, hs): |
|
|
|
|
self.store = hs.get_datastore() |
|
|
|
|
|
|
|
|
|
self.user_id = self.register_user("kermit", "test") |
|
|
|
|
self.user_id_tok = self.login("kermit", "test") |
|
|
|
|
self.email = "test@example.com" |
|
|
|
|
self.url_3pid = b"account/3pid" |
|
|
|
|
|
|
|
|
|
def test_add_email(self): |
|
|
|
|
"""Test adding an email to profile |
|
|
|
|
""" |
|
|
|
|
client_secret = "foobar" |
|
|
|
|
session_id = self._request_token(self.email, client_secret) |
|
|
|
|
|
|
|
|
|
self.assertEquals(len(self.email_attempts), 1) |
|
|
|
|
link = self._get_link_from_email() |
|
|
|
|
|
|
|
|
|
self._validate_token(link) |
|
|
|
|
|
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"POST", |
|
|
|
|
b"/_matrix/client/unstable/account/3pid/add", |
|
|
|
|
{ |
|
|
|
|
"client_secret": client_secret, |
|
|
|
|
"sid": session_id, |
|
|
|
|
"auth": { |
|
|
|
|
"type": "m.login.password", |
|
|
|
|
"user": self.user_id, |
|
|
|
|
"password": "test", |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
|
|
|
|
|
# Get user |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"GET", self.url_3pid, access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
|
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) |
|
|
|
|
self.assertEqual(self.email, channel.json_body["threepids"][0]["address"]) |
|
|
|
|
|
|
|
|
|
def test_add_email_if_disabled(self): |
|
|
|
|
"""Test adding email to profile when doing so is disallowed |
|
|
|
|
""" |
|
|
|
|
self.hs.config.enable_3pid_changes = False |
|
|
|
|
|
|
|
|
|
client_secret = "foobar" |
|
|
|
|
session_id = self._request_token(self.email, client_secret) |
|
|
|
|
|
|
|
|
|
self.assertEquals(len(self.email_attempts), 1) |
|
|
|
|
link = self._get_link_from_email() |
|
|
|
|
|
|
|
|
|
self._validate_token(link) |
|
|
|
|
|
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"POST", |
|
|
|
|
b"/_matrix/client/unstable/account/3pid/add", |
|
|
|
|
{ |
|
|
|
|
"client_secret": client_secret, |
|
|
|
|
"sid": session_id, |
|
|
|
|
"auth": { |
|
|
|
|
"type": "m.login.password", |
|
|
|
|
"user": self.user_id, |
|
|
|
|
"password": "test", |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) |
|
|
|
|
|
|
|
|
|
# Get user |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"GET", self.url_3pid, access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
|
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertFalse(channel.json_body["threepids"]) |
|
|
|
|
|
|
|
|
|
def test_delete_email(self): |
|
|
|
|
"""Test deleting an email from profile |
|
|
|
|
""" |
|
|
|
|
# Add a threepid |
|
|
|
|
self.get_success( |
|
|
|
|
self.store.user_add_threepid( |
|
|
|
|
user_id=self.user_id, |
|
|
|
|
medium="email", |
|
|
|
|
address=self.email, |
|
|
|
|
validated_at=0, |
|
|
|
|
added_at=0, |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"POST", |
|
|
|
|
b"account/3pid/delete", |
|
|
|
|
{"medium": "email", "address": self.email}, |
|
|
|
|
access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
|
|
|
|
|
# Get user |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"GET", self.url_3pid, access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
|
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertFalse(channel.json_body["threepids"]) |
|
|
|
|
|
|
|
|
|
def test_delete_email_if_disabled(self): |
|
|
|
|
"""Test deleting an email from profile when disallowed |
|
|
|
|
""" |
|
|
|
|
self.hs.config.enable_3pid_changes = False |
|
|
|
|
|
|
|
|
|
# Add a threepid |
|
|
|
|
self.get_success( |
|
|
|
|
self.store.user_add_threepid( |
|
|
|
|
user_id=self.user_id, |
|
|
|
|
medium="email", |
|
|
|
|
address=self.email, |
|
|
|
|
validated_at=0, |
|
|
|
|
added_at=0, |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"POST", |
|
|
|
|
b"account/3pid/delete", |
|
|
|
|
{"medium": "email", "address": self.email}, |
|
|
|
|
access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
|
|
|
|
|
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) |
|
|
|
|
|
|
|
|
|
# Get user |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"GET", self.url_3pid, access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
|
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) |
|
|
|
|
self.assertEqual(self.email, channel.json_body["threepids"][0]["address"]) |
|
|
|
|
|
|
|
|
|
def test_cant_add_email_without_clicking_link(self): |
|
|
|
|
"""Test that we do actually need to click the link in the email |
|
|
|
|
""" |
|
|
|
|
client_secret = "foobar" |
|
|
|
|
session_id = self._request_token(self.email, client_secret) |
|
|
|
|
|
|
|
|
|
self.assertEquals(len(self.email_attempts), 1) |
|
|
|
|
|
|
|
|
|
# Attempt to add email without clicking the link |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"POST", |
|
|
|
|
b"/_matrix/client/unstable/account/3pid/add", |
|
|
|
|
{ |
|
|
|
|
"client_secret": client_secret, |
|
|
|
|
"sid": session_id, |
|
|
|
|
"auth": { |
|
|
|
|
"type": "m.login.password", |
|
|
|
|
"user": self.user_id, |
|
|
|
|
"password": "test", |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertEqual(Codes.THREEPID_AUTH_FAILED, channel.json_body["errcode"]) |
|
|
|
|
|
|
|
|
|
# Get user |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"GET", self.url_3pid, access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
|
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertFalse(channel.json_body["threepids"]) |
|
|
|
|
|
|
|
|
|
def test_no_valid_token(self): |
|
|
|
|
"""Test that we do actually need to request a token and can't just |
|
|
|
|
make a session up. |
|
|
|
|
""" |
|
|
|
|
client_secret = "foobar" |
|
|
|
|
session_id = "weasle" |
|
|
|
|
|
|
|
|
|
# Attempt to add email without even requesting an email |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"POST", |
|
|
|
|
b"/_matrix/client/unstable/account/3pid/add", |
|
|
|
|
{ |
|
|
|
|
"client_secret": client_secret, |
|
|
|
|
"sid": session_id, |
|
|
|
|
"auth": { |
|
|
|
|
"type": "m.login.password", |
|
|
|
|
"user": self.user_id, |
|
|
|
|
"password": "test", |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertEqual(Codes.THREEPID_AUTH_FAILED, channel.json_body["errcode"]) |
|
|
|
|
|
|
|
|
|
# Get user |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"GET", self.url_3pid, access_token=self.user_id_tok, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
|
|
|
|
|
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) |
|
|
|
|
self.assertFalse(channel.json_body["threepids"]) |
|
|
|
|
|
|
|
|
|
def _request_token(self, email, client_secret): |
|
|
|
|
request, channel = self.make_request( |
|
|
|
|
"POST", |
|
|
|
|
b"account/3pid/email/requestToken", |
|
|
|
|
{"client_secret": client_secret, "email": email, "send_attempt": 1}, |
|
|
|
|
) |
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEquals(200, channel.code, channel.result) |
|
|
|
|
|
|
|
|
|
return channel.json_body["sid"] |
|
|
|
|
|
|
|
|
|
def _validate_token(self, link): |
|
|
|
|
# Remove the host |
|
|
|
|
path = link.replace("https://example.com", "") |
|
|
|
|
|
|
|
|
|
request, channel = self.make_request("GET", path, shorthand=False) |
|
|
|
|
self.render(request) |
|
|
|
|
self.assertEquals(200, channel.code, channel.result) |
|
|
|
|
|
|
|
|
|
def _get_link_from_email(self): |
|
|
|
|
assert self.email_attempts, "No emails have been sent" |
|
|
|
|
|
|
|
|
|
raw_msg = self.email_attempts[-1].decode("UTF-8") |
|
|
|
|
mail = Parser().parsestr(raw_msg) |
|
|
|
|
|
|
|
|
|
text = None |
|
|
|
|
for part in mail.walk(): |
|
|
|
|
if part.get_content_type() == "text/plain": |
|
|
|
|
text = part.get_payload(decode=True).decode("UTF-8") |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
if not text: |
|
|
|
|
self.fail("Could not find text portion of email to parse") |
|
|
|
|
|
|
|
|
|
match = re.search(r"https://example.com\S+", text) |
|
|
|
|
assert match, "Could not find link in email" |
|
|
|
|
|
|
|
|
|
return match.group(0) |
|
|
|
|