From 986615b0b21271959adb9d64291761244e4175bd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 1 Aug 2016 18:02:07 +0100 Subject: [PATCH 01/24] Move e2e query logic into a handler --- synapse/handlers/e2e_keys.py | 67 ++++++++++++++++++++++++++++ synapse/rest/client/v2_alpha/keys.py | 46 +++---------------- synapse/server.py | 45 ++++++++++--------- synapse/server.pyi | 4 ++ 4 files changed, 102 insertions(+), 60 deletions(-) create mode 100644 synapse/handlers/e2e_keys.py diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py new file mode 100644 index 000000000..73a14cf95 --- /dev/null +++ b/synapse/handlers/e2e_keys.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging + +from twisted.internet import defer + +import synapse.types +from ._base import BaseHandler + +logger = logging.getLogger(__name__) + + +class E2eKeysHandler(BaseHandler): + def __init__(self, hs): + super(E2eKeysHandler, self).__init__(hs) + self.store = hs.get_datastore() + self.federation = hs.get_replication_layer() + self.is_mine = hs.is_mine + + @defer.inlineCallbacks + def query_devices(self, query_body): + local_query = [] + remote_queries = {} + for user_id, device_ids in query_body.get("device_keys", {}).items(): + user = synapse.types.UserID.from_string(user_id) + if self.is_mine(user): + if not device_ids: + local_query.append((user_id, None)) + else: + for device_id in device_ids: + local_query.append((user_id, device_id)) + else: + remote_queries.setdefault(user.domain, {})[user_id] = list( + device_ids + ) + results = yield self.store.get_e2e_device_keys(local_query) + + json_result = {} + for user_id, device_keys in results.items(): + for device_id, json_bytes in device_keys.items(): + json_result.setdefault(user_id, {})[ + device_id] = json.loads( + json_bytes + ) + + for destination, device_keys in remote_queries.items(): + remote_result = yield self.federation.query_client_keys( + destination, {"device_keys": device_keys} + ) + for user_id, keys in remote_result["device_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys + defer.returnValue((200, {"device_keys": json_result})) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index dc1d4d8fc..705a0b6c1 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -186,17 +186,19 @@ class KeyQueryServlet(RestServlet): ) def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ super(KeyQueryServlet, self).__init__() - self.store = hs.get_datastore() self.auth = hs.get_auth() - self.federation = hs.get_replication_layer() - self.is_mine = hs.is_mine + self.e2e_keys_handler = hs.get_e2e_keys_handler() @defer.inlineCallbacks def on_POST(self, request, user_id, device_id): yield self.auth.get_user_by_req(request) body = parse_json_object_from_request(request) - result = yield self.handle_request(body) + result = yield self.e2e_keys_handler.query_devices(body) defer.returnValue(result) @defer.inlineCallbacks @@ -205,45 +207,11 @@ class KeyQueryServlet(RestServlet): auth_user_id = requester.user.to_string() user_id = user_id if user_id else auth_user_id device_ids = [device_id] if device_id else [] - result = yield self.handle_request( + result = yield self.e2e_keys_handler.query_devices( {"device_keys": {user_id: device_ids}} ) defer.returnValue(result) - @defer.inlineCallbacks - def handle_request(self, body): - local_query = [] - remote_queries = {} - for user_id, device_ids in body.get("device_keys", {}).items(): - user = UserID.from_string(user_id) - if self.is_mine(user): - if not device_ids: - local_query.append((user_id, None)) - else: - for device_id in device_ids: - local_query.append((user_id, device_id)) - else: - remote_queries.setdefault(user.domain, {})[user_id] = list( - device_ids - ) - results = yield self.store.get_e2e_device_keys(local_query) - - json_result = {} - for user_id, device_keys in results.items(): - for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[device_id] = json.loads( - json_bytes - ) - - for destination, device_keys in remote_queries.items(): - remote_result = yield self.federation.query_client_keys( - destination, {"device_keys": device_keys} - ) - for user_id, keys in remote_result["device_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys - defer.returnValue((200, {"device_keys": json_result})) - class OneTimeKeyServlet(RestServlet): """ diff --git a/synapse/server.py b/synapse/server.py index e8b166990..6bb498830 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -19,39 +19,38 @@ # partial one for unit test mocking. # Imports required for the default HomeServer() implementation -from twisted.web.client import BrowserLikePolicyForHTTPS +import logging + from twisted.enterprise import adbapi +from twisted.web.client import BrowserLikePolicyForHTTPS -from synapse.appservice.scheduler import ApplicationServiceScheduler +from synapse.api.auth import Auth +from synapse.api.filtering import Filtering +from synapse.api.ratelimiting import Ratelimiter from synapse.appservice.api import ApplicationServiceApi +from synapse.appservice.scheduler import ApplicationServiceScheduler +from synapse.crypto.keyring import Keyring +from synapse.events.builder import EventBuilderFactory from synapse.federation import initialize_http_replication -from synapse.handlers.device import DeviceHandler -from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory -from synapse.notifier import Notifier -from synapse.api.auth import Auth from synapse.handlers import Handlers +from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.handlers.auth import AuthHandler +from synapse.handlers.device import DeviceHandler +from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.presence import PresenceHandler +from synapse.handlers.room import RoomListHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler -from synapse.handlers.room import RoomListHandler -from synapse.handlers.auth import AuthHandler -from synapse.handlers.appservice import ApplicationServicesHandler +from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory +from synapse.http.matrixfederationclient import MatrixFederationHttpClient +from synapse.notifier import Notifier +from synapse.push.pusherpool import PusherPool +from synapse.rest.media.v1.media_repository import MediaRepository from synapse.state import StateHandler from synapse.storage import DataStore +from synapse.streams.events import EventSources from synapse.util import Clock from synapse.util.distributor import Distributor -from synapse.streams.events import EventSources -from synapse.api.ratelimiting import Ratelimiter -from synapse.crypto.keyring import Keyring -from synapse.push.pusherpool import PusherPool -from synapse.events.builder import EventBuilderFactory -from synapse.api.filtering import Filtering -from synapse.rest.media.v1.media_repository import MediaRepository - -from synapse.http.matrixfederationclient import MatrixFederationHttpClient - -import logging - logger = logging.getLogger(__name__) @@ -94,6 +93,7 @@ class HomeServer(object): 'room_list_handler', 'auth_handler', 'device_handler', + 'e2e_keys_handler', 'application_service_api', 'application_service_scheduler', 'application_service_handler', @@ -202,6 +202,9 @@ class HomeServer(object): def build_device_handler(self): return DeviceHandler(self) + def build_e2e_keys_handler(self): + return E2eKeysHandler(self) + def build_application_service_api(self): return ApplicationServiceApi(self) diff --git a/synapse/server.pyi b/synapse/server.pyi index 902f725c0..c0aa868c4 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -1,6 +1,7 @@ import synapse.handlers import synapse.handlers.auth import synapse.handlers.device +import synapse.handlers.e2e_keys import synapse.storage import synapse.state @@ -14,6 +15,9 @@ class HomeServer(object): def get_device_handler(self) -> synapse.handlers.device.DeviceHandler: pass + def get_e2e_keys_handler(self) -> synapse.handlers.e2e_keys.E2eKeysHandler: + pass + def get_handlers(self) -> synapse.handlers.Handlers: pass From 1efee2f52b931ddcd90e87d06c7ea614da2c9cd0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 2 Aug 2016 18:06:31 +0100 Subject: [PATCH 02/24] E2E keys: Make federation query share code with client query Refactor the e2e query handler to separate out the local query, and then make the federation handler use it. --- synapse/federation/federation_server.py | 20 +---- synapse/federation/transport/server.py | 4 +- synapse/handlers/e2e_keys.py | 115 ++++++++++++++++++------ 3 files changed, 92 insertions(+), 47 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752f..e637f2a8b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -348,27 +348,9 @@ class FederationServer(FederationBase): (200, send_content) ) - @defer.inlineCallbacks @log_function def on_query_client_keys(self, origin, content): - query = [] - for user_id, device_ids in content.get("device_keys", {}).items(): - if not device_ids: - query.append((user_id, None)) - else: - for device_id in device_ids: - query.append((user_id, device_id)) - - results = yield self.store.get_e2e_device_keys(query) - - json_result = {} - for user_id, device_keys in results.items(): - for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[device_id] = json.loads( - json_bytes - ) - - defer.returnValue({"device_keys": json_result}) + return self.on_query_request("client_keys", content) @defer.inlineCallbacks @log_function diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae8..1a88413d1 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -367,10 +367,8 @@ class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): class FederationClientKeysQueryServlet(BaseFederationServlet): PATH = "/user/keys/query" - @defer.inlineCallbacks def on_POST(self, origin, content, query): - response = yield self.handler.on_query_client_keys(origin, content) - defer.returnValue((200, response)) + return self.handler.on_query_client_keys(origin, content) class FederationClientKeysClaimServlet(BaseFederationServlet): diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 73a14cf95..9c7e9494d 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,12 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import json import logging from twisted.internet import defer +from synapse.api import errors import synapse.types + from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -29,39 +32,101 @@ class E2eKeysHandler(BaseHandler): super(E2eKeysHandler, self).__init__(hs) self.store = hs.get_datastore() self.federation = hs.get_replication_layer() - self.is_mine = hs.is_mine + self.is_mine_id = hs.is_mine_id + + # doesn't really work as part of the generic query API, because the + # query request requires an object POST, but we abuse the + # "query handler" interface. + self.federation.register_query_handler( + "client_keys", self.on_federation_query_client_keys + ) @defer.inlineCallbacks def query_devices(self, query_body): - local_query = [] - remote_queries = {} - for user_id, device_ids in query_body.get("device_keys", {}).items(): + """ Handle a device key query from a client + + { + "device_keys": { + "": [""] + } + } + -> + { + "device_keys": { + "": { + "": { + ... + } + } + } + } + """ + device_keys_query = query_body.get("device_keys", {}) + + # separate users by domain. + # make a map from domain to user_id to device_ids + queries_by_domain = collections.defaultdict(dict) + for user_id, device_ids in device_keys_query.items(): user = synapse.types.UserID.from_string(user_id) - if self.is_mine(user): - if not device_ids: - local_query.append((user_id, None)) - else: - for device_id in device_ids: - local_query.append((user_id, device_id)) + queries_by_domain[user.domain][user_id] = device_ids + + # do the queries + # TODO: do these in parallel + results = {} + for destination, destination_query in queries_by_domain.items(): + if destination == self.hs.hostname: + res = yield self.query_local_devices(destination_query) else: - remote_queries.setdefault(user.domain, {})[user_id] = list( - device_ids + res = yield self.federation.query_client_keys( + destination, {"device_keys": destination_query} ) + res = res["device_keys"] + for user_id, keys in res.items(): + if user_id in destination_query: + results[user_id] = keys + + defer.returnValue((200, {"device_keys": results})) + + @defer.inlineCallbacks + def query_local_devices(self, query): + """Get E2E device keys for local users + + Args: + query (dict[string, list[string]|None): map from user_id to a list + of devices to query (None for all devices) + + Returns: + defer.Deferred: (resolves to dict[string, dict[string, dict]]): + map from user_id -> device_id -> device details + """ + local_query = [] + + for user_id, device_ids in query.items(): + if not self.is_mine_id(user_id): + logger.warning("Request for keys for non-local user %s", + user_id) + raise errors.SynapseError(400, "Not a user here") + + if not device_ids: + local_query.append((user_id, None)) + else: + for device_id in device_ids: + local_query.append((user_id, device_id)) + results = yield self.store.get_e2e_device_keys(local_query) - json_result = {} + # un-jsonify the results + json_result = collections.defaultdict(dict) for user_id, device_keys in results.items(): for device_id, json_bytes in device_keys.items(): - json_result.setdefault(user_id, {})[ - device_id] = json.loads( - json_bytes - ) + json_result[user_id][device_id] = json.loads(json_bytes) - for destination, device_keys in remote_queries.items(): - remote_result = yield self.federation.query_client_keys( - destination, {"device_keys": device_keys} - ) - for user_id, keys in remote_result["device_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys - defer.returnValue((200, {"device_keys": json_result})) + defer.returnValue(json_result) + + @defer.inlineCallbacks + def on_federation_query_client_keys(self, query_body): + """ Handle a device key query from a federated server + """ + device_keys_query = query_body.get("device_keys", {}) + res = yield self.query_local_devices(device_keys_query) + defer.returnValue({"device_keys": res}) From 9a2f296fa2e6bd42f10b12a81dba2279b8482fcc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 2 Aug 2016 20:42:30 +0100 Subject: [PATCH 03/24] Factor out some of the code shared between the sytest scripts (#974) * Factor out some of the code shared between the different sytest jenkins scripts * Exclude jenkins from the MANIFEST * Fix dendron build * Missing new line * Poke jenkins * Export the PORT_BASE and PORT_COUNT * Poke jenkins --- MANIFEST.in | 2 ++ jenkins-dendron-postgres.sh | 41 ++++++++----------------------------- jenkins-postgres.sh | 29 ++++++++------------------ jenkins-sqlite.sh | 25 ++++++++-------------- jenkins/clone.sh | 24 ++++++++++++++++++++++ jenkins/prepare_synapse.sh | 19 +++++++++++++++++ 6 files changed, 71 insertions(+), 69 deletions(-) create mode 100755 jenkins/clone.sh create mode 100755 jenkins/prepare_synapse.sh diff --git a/MANIFEST.in b/MANIFEST.in index 216df265b..981698143 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -24,5 +24,7 @@ recursive-include synapse/static *.js exclude jenkins.sh exclude jenkins*.sh +exclude jenkins* +recursive-exclude jenkins *.sh prune demo/etc diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index f715cd559..e6e94cc8b 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -22,24 +22,10 @@ export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished w rm .coverage* || echo "No coverage files to remove" -tox --notest -e py27 +./jenkins/prepare_synapse.sh -TOX_BIN=$WORKSPACE/.tox/py27/bin -python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install -$TOX_BIN/pip install psycopg2 -$TOX_BIN/pip install lxml - -: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} - -if [[ ! -e .dendron-base ]]; then - git clone https://github.com/matrix-org/dendron.git .dendron-base --mirror -else - (cd .dendron-base; git fetch -p) -fi - -rm -rf dendron -git clone .dendron-base dendron --shared -cd dendron +./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git +./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git : ${GOPATH:=${WORKSPACE}/.gopath} if [[ "${GOPATH}" != *:* ]]; then @@ -48,35 +34,26 @@ if [[ "${GOPATH}" != *:* ]]; then fi export GOPATH -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) +cd dendron go get github.com/constabulary/gb/... gb generate gb build -cd .. - - -if [[ ! -e .sytest-base ]]; then - git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror -else - (cd .sytest-base; git fetch -p) -fi - -rm -rf sytest -git clone .sytest-base sytest --shared -cd sytest - -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) +cd ../sytest : ${PORT_BASE:=20000} : ${PORT_COUNT=100} +export PORT_BASE +export PORT_COUNT ./jenkins/prep_sytest_for_postgres.sh mkdir -p var echo >&2 "Running sytest with PostgreSQL"; + +TOX_BIN=$WORKSPACE/.tox/py27/bin ./jenkins/install_and_run.sh --python $TOX_BIN/python \ --synapse-directory $WORKSPACE \ --dendron $WORKSPACE/dendron/bin/dendron \ diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index 7a43df0d5..edf61a45b 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -22,37 +22,26 @@ export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished w rm .coverage* || echo "No coverage files to remove" -tox --notest -e py27 +./jenkins/prepare_synapse.sh -TOX_BIN=$WORKSPACE/.tox/py27/bin -python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install -$TOX_BIN/pip install psycopg2 -$TOX_BIN/pip install lxml - -: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} - -if [[ ! -e .sytest-base ]]; then - git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror -else - (cd .sytest-base; git fetch -p) -fi - -rm -rf sytest -git clone .sytest-base sytest --shared -cd sytest - -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) +./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git : ${PORT_BASE:=20000} : ${PORT_COUNT=100} +export PORT_BASE +export PORT_COUNT + +cd sytest ./jenkins/prep_sytest_for_postgres.sh echo >&2 "Running sytest with PostgreSQL"; + +TOX_BIN=$WORKSPACE/.tox/py27/bin ./jenkins/install_and_run.sh --coverage \ --python $TOX_BIN/python \ --synapse-directory $WORKSPACE \ - --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) \ + --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) cd .. cp sytest/.coverage.* . diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index 27e61af6e..1c3530ebb 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -4,6 +4,7 @@ set -eux : ${WORKSPACE:="$(pwd)"} +export WORKSPACE export PYTHONDONTWRITEBYTECODE=yep export SYNAPSE_CACHE_FACTOR=1 @@ -22,28 +23,18 @@ export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished w rm .coverage* || echo "No coverage files to remove" -tox --notest -e py27 -TOX_BIN=$WORKSPACE/.tox/py27/bin -python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install -$TOX_BIN/pip install lxml - -: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} - -if [[ ! -e .sytest-base ]]; then - git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror -else - (cd .sytest-base; git fetch -p) -fi - -rm -rf sytest -git clone .sytest-base sytest --shared -cd sytest +./jenkins/prepare_synapse.sh -git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) +./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git : ${PORT_BASE:=20000} : ${PORT_COUNT=100} +export PORT_BASE +export PORT_COUNT + +cd sytest +TOX_BIN=$WORKSPACE/.tox/py27/bin ./jenkins/install_and_run.sh --coverage \ --python $TOX_BIN/python \ --synapse-directory $WORKSPACE \ diff --git a/jenkins/clone.sh b/jenkins/clone.sh new file mode 100755 index 000000000..f56d076ea --- /dev/null +++ b/jenkins/clone.sh @@ -0,0 +1,24 @@ +#! /bin/bash + +NAME=$1 +PROJECT=$2 +BASE=".$NAME-base" + +# update our clone +if [ ! -d .$NAME-base ]; then + git clone $PROJECT $BASE --mirror +else + (cd $BASE; git fetch -p) +fi + +rm -rf $NAME +git clone $BASE $NAME --shared + +: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} +cd $NAME +# check out the relevant branch +git checkout "${GIT_BRANCH}" || ( + echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" + git checkout "origin/develop" +) +git clean -df . diff --git a/jenkins/prepare_synapse.sh b/jenkins/prepare_synapse.sh new file mode 100755 index 000000000..237223c81 --- /dev/null +++ b/jenkins/prepare_synapse.sh @@ -0,0 +1,19 @@ +#! /bin/bash + +cd "`dirname $0`/.." + +TOX_DIR=$WORKSPACE/.tox + +mkdir -p $TOX_DIR + +if ! [ $TOX_DIR -ef .tox ]; then + ln -s "$TOX_DIR" .tox +fi + +# set up the virtualenv +tox -e py27 --notest -v + +TOX_BIN=$TOX_DIR/py27/bin +python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install +$TOX_BIN/pip install lxml +$TOX_BIN/pip install psycopg2 From a8a32d2714705d679584c5e10dfa79d9b4a8a76f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 11:23:39 +0100 Subject: [PATCH 04/24] Ensure we only persist an event once at a time --- synapse/storage/events.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c63ca36df..670aa8f11 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -26,7 +26,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from canonicaljson import encode_canonical_json -from collections import deque, namedtuple +from collections import deque, namedtuple, OrderedDict import synapse import synapse.metrics @@ -403,6 +403,23 @@ class EventsStore(SQLBaseStore): and the rejections table. Things reading from those table will need to check whether the event was rejected. """ + # Ensure that we don't have the same event twice. + # Pick the earliest non-outlier if there is one, else the earliest one. + new_events_and_contexts = OrderedDict() + for event, context in events_and_contexts: + prev_event_context = new_events_and_contexts.get(event.event_id) + if prev_event_context: + if not event.internal_metadata.is_outlier(): + if prev_event_context[0].internal_metadata.is_outlier(): + # To ensure correct ordering we pop, as OrderedDict is + # ordered by first insertion. + new_events_and_contexts.pop(event.event_id, None) + new_events_and_contexts[event.event_id] = (event, context) + else: + new_events_and_contexts[event.event_id] = (event, context) + + events_and_contexts = new_events_and_contexts.values() + depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids From 4fec5e57be72e5374342637b4062aeff0df6adc3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Aug 2016 11:39:39 +0100 Subject: [PATCH 05/24] Default device_display_name to null It turns out that it's more useful to return a null device display name (and let clients decide how to handle it: eg, falling back to device_id) than using a constant string like "unknown device". --- synapse/handlers/device.py | 2 +- synapse/rest/client/v2_alpha/keys.py | 4 +--- .../schema/delta/33/devices_for_e2e_keys.sql | 2 +- ...ices_for_e2e_keys_clear_unknown_device.sql | 20 +++++++++++++++++++ 4 files changed, 23 insertions(+), 5 deletions(-) create mode 100644 synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f4bf159bb..fcbe7f8e6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,7 +29,7 @@ class DeviceHandler(BaseHandler): @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, - initial_device_display_name): + initial_device_display_name = None): """ If the given device has not been registered, register it with the supplied display name. diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index dc1d4d8fc..5fa33acee 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -130,9 +130,7 @@ class KeyUploadServlet(RestServlet): # old access_token without an associated device_id. Either way, we # need to double-check the device is registered to avoid ending up with # keys without a corresponding device. - self.device_handler.check_device_registered( - user_id, device_id, "unknown device" - ) + self.device_handler.check_device_registered(user_id, device_id) result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue((200, {"one_time_key_counts": result})) diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql index 140f2b63e..aa4a3b9f2 100644 --- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -16,4 +16,4 @@ -- make sure that we have a device record for each set of E2E keys, so that the -- user can delete them if they like. INSERT INTO devices - SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json; + SELECT user_id, device_id, NULL FROM e2e_device_keys_json; diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql new file mode 100644 index 000000000..667157339 --- /dev/null +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys_clear_unknown_device.sql @@ -0,0 +1,20 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- a previous version of the "devices_for_e2e_keys" delta set all the device +-- names to "unknown device". This wasn't terribly helpful +UPDATE devices + SET display_name = NULL + WHERE display_name = 'unknown device'; From 80ad7102176a3b24ab589b6eb4aa42b872f054da Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 13:22:26 +0100 Subject: [PATCH 06/24] Remove other bit of deduplication --- synapse/storage/events.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 670aa8f11..4664cfe6d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -450,8 +450,6 @@ class EventsStore(SQLBaseStore): for event_id, outlier in txn.fetchall() } - # Remove the events that we've seen before. - event_map = {} to_remove = set() for event, context in events_and_contexts: if context.rejected: @@ -462,23 +460,6 @@ class EventsStore(SQLBaseStore): to_remove.add(event) continue - # Handle the case of the list including the same event multiple - # times. The tricky thing here is when they differ by whether - # they are an outlier. - if event.event_id in event_map: - other = event_map[event.event_id] - - if not other.internal_metadata.is_outlier(): - to_remove.add(event) - continue - elif not event.internal_metadata.is_outlier(): - to_remove.add(event) - continue - else: - to_remove.add(other) - - event_map[event.event_id] = event - if event.event_id not in have_persisted: continue From 97f072db74e6442c5eb38eb1d6ae84a59d504391 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 13:46:47 +0100 Subject: [PATCH 07/24] Print status code in federation_client.py --- scripts-dev/federation_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index caa3cee4e..59c3dce3d 100644 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -128,6 +128,7 @@ def get_json(origin_name, origin_key, destination, path): headers={"Authorization": authorization_headers[0]}, verify=False, ) + sys.stderr.write("Status Code: %d\n" % (result.status_code,)) return result.json() From a843868fe942aaa9d32fe858476d1b459813a854 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Aug 2016 14:24:33 +0100 Subject: [PATCH 08/24] E2eKeysHandler: minor tweaks PR feedback --- synapse/handlers/e2e_keys.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 9c7e9494d..1312cdf5a 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -22,17 +22,15 @@ from twisted.internet import defer from synapse.api import errors import synapse.types -from ._base import BaseHandler - logger = logging.getLogger(__name__) -class E2eKeysHandler(BaseHandler): +class E2eKeysHandler(object): def __init__(self, hs): - super(E2eKeysHandler, self).__init__(hs) self.store = hs.get_datastore() self.federation = hs.get_replication_layer() self.is_mine_id = hs.is_mine_id + self.server_name = hs.hostname # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the @@ -74,7 +72,7 @@ class E2eKeysHandler(BaseHandler): # TODO: do these in parallel results = {} for destination, destination_query in queries_by_domain.items(): - if destination == self.hs.hostname: + if destination == self.server_name: res = yield self.query_local_devices(destination_query) else: res = yield self.federation.query_client_keys( From a6f5cc65d9c4b1b1adf909355c03e72c456627a6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Aug 2016 14:30:06 +0100 Subject: [PATCH 09/24] PEP8 --- synapse/handlers/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index fcbe7f8e6..8d630c6b1 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -29,7 +29,7 @@ class DeviceHandler(BaseHandler): @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, - initial_device_display_name = None): + initial_device_display_name=None): """ If the given device has not been registered, register it with the supplied display name. From e3a720217a9d200a7c3db8305df53ef8bf76f565 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:47:37 +0100 Subject: [PATCH 10/24] Add /state_ids federation API The new API only returns the event_ids for the state, as most requesters will already have the vast majority of the events already. --- synapse/federation/federation_client.py | 73 ++++++++++++++++++++++++- synapse/federation/federation_server.py | 21 +++++++ synapse/federation/transport/client.py | 22 ++++++++ synapse/federation/transport/server.py | 12 ++++ 4 files changed, 125 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b06387051..03f6133e6 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -314,9 +314,32 @@ class FederationClient(FederationBase): Deferred: Results in a list of PDUs. """ - result = yield self.transport_layer.get_room_state( - destination, room_id, event_id=event_id, - ) + try: + # First we try and ask for just the IDs, as thats far quicker if + # we have most of the state and auth_chain already. + # However, this may 404 if the other side has an old synapse. + result = yield self.transport_layer.get_room_state_ids( + destination, room_id, event_id=event_id, + ) + + state_event_ids = result["pdus"] + auth_event_ids = result.get("auth_chain", []) + + event_map, _failed_to_fetch = yield self.get_events( + [destination], room_id, set(state_event_ids + auth_event_ids) + ) + + pdus = [event_map[e_id] for e_id in state_event_ids] + auth_chain = [event_map[e_id] for e_id in auth_event_ids] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue((pdus, auth_chain)) + except HttpResponseException as e: + if e.code == 404: + logger.info("Failed to use get_room_state_ids API, falling back") + else: + raise e pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] @@ -339,6 +362,50 @@ class FederationClient(FederationBase): defer.returnValue((signed_pdus, signed_auth)) + @defer.inlineCallbacks + def get_events(self, destinations, room_id, event_ids, return_local=True): + if return_local: + seen_events = yield self.store.get_events(event_ids) + signed_events = seen_events.values() + else: + seen_events = yield self.store.have_events(event_ids) + signed_events = [] + + failed_to_fetch = [] + + missing_events = set(event_ids) + for k in seen_events: + missing_events.discard(k) + + if not missing_events: + defer.returnValue((signed_events, failed_to_fetch)) + + def random_server_list(): + srvs = list(destinations) + random.shuffle(srvs) + return srvs + + batch_size = 20 + for i in xrange(0, len(missing_events), batch_size): + batch = missing_events[i:i + batch_size] + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ).addBoth(lambda r, e: (r, e), e_id) + for e_id in batch + ] + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in res: + if result and val: + signed_events.append(val) + else: + failed_to_fetch.add(e_id) + + defer.returnValue((signed_events, failed_to_fetch)) + @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 612d274bd..40e9fda0e 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -214,6 +214,27 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) + @defer.inlineCallbacks + def on_state_ids_request(self, origin, room_id, event_id): + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + defer.returnValue((200, { + "pdus": [pdu.event_id for pdu in pdus], + "auth_chain": [pdu.event_id for pdu in auth_chain], + })) + @defer.inlineCallbacks def _on_context_state_request_compute(self, room_id, event_id): pdus = yield self.handler.get_state_for_pdu( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ebb698e27..3d088e43c 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -54,6 +54,28 @@ class TransportLayerClient(object): destination, path=path, args={"event_id": event_id}, ) + @log_function + def get_room_state_ids(self, destination, room_id, event_id): + """ Requests all state for a given room from the given server at the + given event. Returns the state's event_id's + + Args: + destination (str): The host name of the remote home server we want + to get the state from. + context (str): The name of the context we want the state of + event_id (str): The event we want the context at. + + Returns: + Deferred: Results in a dict received from the remote homeserver. + """ + logger.debug("get_room_state_ids dest=%s, room=%s", + destination, room_id) + + path = PREFIX + "/state_ids/%s/" % room_id + return self.client.get_json( + destination, path=path, args={"event_id": event_id}, + ) + @log_function def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 26fa88ae8..3ae7c4845 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet): ) +class FederationStateIdsServlet(BaseFederationServlet): + PATH = "/state_ids/(?P[^/]*)/" + + def on_GET(self, origin, content, query, room_id): + return self.handler.on_state_ids_request( + origin, + room_id, + query.get("event_id", [None])[0], + ) + + class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P[^/]*)/" @@ -538,6 +549,7 @@ SERVLET_CLASSES = ( FederationPullServlet, FederationEventServlet, FederationStateServlet, + FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, From a60a2eaa02f454dbc450cf821f6cd1c6b0b93993 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 14:52:43 +0100 Subject: [PATCH 11/24] Comment --- synapse/federation/federation_client.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 03f6133e6..0491f1c3f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -364,6 +364,20 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def get_events(self, destinations, room_id, event_ids, return_local=True): + """Fetch events from some remote destinations, checking if we already + have them. + + Args: + destinations (list) + room_id (str) + event_ids (list) + return_local (bool): Whether to include events we already have in + the DB in the returned list of events + + Returns: + Deferred: A deferred resolving to a 2-tuple where the first is a list of + events and the second is a list of event ids that we failed to fetch. + """ if return_local: seen_events = yield self.store.get_events(event_ids) signed_events = seen_events.values() From 520ee9bd2c91a75eb1dc7ed923016967856c6bdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:03:15 +0100 Subject: [PATCH 12/24] Fix syntax error --- synapse/federation/federation_client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0491f1c3f..6c626cf12 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -325,10 +325,17 @@ class FederationClient(FederationBase): state_event_ids = result["pdus"] auth_event_ids = result.get("auth_chain", []) - event_map, _failed_to_fetch = yield self.get_events( + fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) ) + if failed_to_fetch: + logger.warn("Failed to get %r", failed_to_fetch) + + event_map = { + ev.event_id: ev for ev in fetched_events + } + pdus = [event_map[e_id] for e_id in state_event_ids] auth_chain = [event_map[e_id] for e_id in auth_event_ids] From 4c56bedee3bb63d7035fca4a1a092b11de0b18cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 15:04:29 +0100 Subject: [PATCH 13/24] Actually call get_room_state --- synapse/federation/federation_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6c626cf12..7eadcdd28 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -348,6 +348,10 @@ class FederationClient(FederationBase): else: raise e + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, + ) + pdus = [ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] ] From 91fa69e0299167dad4df9831b8d175a99564b266 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Aug 2016 11:48:32 +0100 Subject: [PATCH 14/24] keys/query: return all users which were asked for In the situation where all of a user's devices get deleted, we want to indicate this to a client, so we want to return an empty dictionary, rather than nothing at all. --- synapse/handlers/e2e_keys.py | 9 ++++--- tests/handlers/test_e2e_keys.py | 46 +++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 tests/handlers/test_e2e_keys.py diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 1312cdf5a..950fc927b 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -99,6 +99,7 @@ class E2eKeysHandler(object): """ local_query = [] + result_dict = {} for user_id, device_ids in query.items(): if not self.is_mine_id(user_id): logger.warning("Request for keys for non-local user %s", @@ -111,15 +112,17 @@ class E2eKeysHandler(object): for device_id in device_ids: local_query.append((user_id, device_id)) + # make sure that each queried user appears in the result dict + result_dict[user_id] = {} + results = yield self.store.get_e2e_device_keys(local_query) # un-jsonify the results - json_result = collections.defaultdict(dict) for user_id, device_keys in results.items(): for device_id, json_bytes in device_keys.items(): - json_result[user_id][device_id] = json.loads(json_bytes) + result_dict[user_id][device_id] = json.loads(json_bytes) - defer.returnValue(json_result) + defer.returnValue(result_dict) @defer.inlineCallbacks def on_federation_query_client_keys(self, query_body): diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py new file mode 100644 index 000000000..878a54dc3 --- /dev/null +++ b/tests/handlers/test_e2e_keys.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from twisted.internet import defer + +import synapse.api.errors +import synapse.handlers.e2e_keys + +import synapse.storage +from tests import unittest, utils + + +class E2eKeysHandlerTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(E2eKeysHandlerTestCase, self).__init__(*args, **kwargs) + self.hs = None # type: synapse.server.HomeServer + self.handler = None # type: synapse.handlers.e2e_keys.E2eKeysHandler + + @defer.inlineCallbacks + def setUp(self): + self.hs = yield utils.setup_test_homeserver( + handlers=None, + replication_layer=mock.Mock(), + ) + self.handler = synapse.handlers.e2e_keys.E2eKeysHandler(self.hs) + + @defer.inlineCallbacks + def test_query_local_devices_no_devices(self): + """If the user has no devices, we expect an empty list. + """ + local_user = "@boris:" + self.hs.hostname + res = yield self.handler.query_local_devices({local_user: None}) + self.assertDictEqual(res, {local_user: {}}) From 68264d7404214d30d32310991c89ea4a234c319a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Aug 2016 07:46:57 +0100 Subject: [PATCH 15/24] Include device name in /keys/query response Add an 'unsigned' section which includes the device display name. --- synapse/handlers/e2e_keys.py | 11 +++- synapse/storage/end_to_end_keys.py | 60 ++++++++++++----- tests/storage/test_end_to_end_keys.py | 92 +++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 20 deletions(-) create mode 100644 tests/storage/test_end_to_end_keys.py diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 950fc927b..bb69089b9 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -117,10 +117,15 @@ class E2eKeysHandler(object): results = yield self.store.get_e2e_device_keys(local_query) - # un-jsonify the results + # Build the result structure, un-jsonify the results, and add the + # "unsigned" section for user_id, device_keys in results.items(): - for device_id, json_bytes in device_keys.items(): - result_dict[user_id][device_id] = json.loads(json_bytes) + for device_id, device_info in device_keys.items(): + r = json.loads(device_info["key_json"]) + r["unsigned"] = { + "device_display_name": device_info["device_display_name"], + } + result_dict[user_id][device_id] = r defer.returnValue(result_dict) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 62b7790e9..5c8ed3e49 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import collections import twisted.internet.defer @@ -38,24 +39,49 @@ class EndToEndKeyStore(SQLBaseStore): query_list(list): List of pairs of user_ids and device_ids. Returns: Dict mapping from user-id to dict mapping from device_id to - key json byte strings. + dict containing "key_json", "device_display_name". """ - def _get_e2e_device_keys(txn): - result = {} - for user_id, device_id in query_list: - user_result = result.setdefault(user_id, {}) - keyvalues = {"user_id": user_id} - if device_id: - keyvalues["device_id"] = device_id - rows = self._simple_select_list_txn( - txn, table="e2e_device_keys_json", - keyvalues=keyvalues, - retcols=["device_id", "key_json"] - ) - for row in rows: - user_result[row["device_id"]] = row["key_json"] - return result - return self.runInteraction("get_e2e_device_keys", _get_e2e_device_keys) + if not query_list: + return {} + + return self.runInteraction( + "get_e2e_device_keys", self._get_e2e_device_keys_txn, query_list + ) + + def _get_e2e_device_keys_txn(self, txn, query_list): + query_clauses = [] + query_params = [] + + for (user_id, device_id) in query_list: + query_clause = "k.user_id = ?" + query_params.append(user_id) + + if device_id: + query_clause += " AND k.device_id = ?" + query_params.append(device_id) + + query_clauses.append(query_clause) + + sql = ( + "SELECT k.user_id, k.device_id, " + " d.display_name AS device_display_name, " + " k.key_json" + " FROM e2e_device_keys_json k" + " LEFT JOIN devices d ON d.user_id = k.user_id" + " AND d.device_id = k.device_id" + " WHERE %s" + ) % ( + " OR ".join("("+q+")" for q in query_clauses) + ) + + txn.execute(sql, query_params) + rows = self.cursor_to_dict(txn) + + result = collections.defaultdict(dict) + for row in rows: + result[row["user_id"]][row["device_id"]] = row + + return result def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): def _add_e2e_one_time_keys(txn): diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py new file mode 100644 index 000000000..0ebc6dafe --- /dev/null +++ b/tests/storage/test_end_to_end_keys.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import synapse.api.errors +import tests.unittest +import tests.utils + + +class EndToEndKeyStoreTestCase(tests.unittest.TestCase): + def __init__(self, *args, **kwargs): + super(EndToEndKeyStoreTestCase, self).__init__(*args, **kwargs) + self.store = None # type: synapse.storage.DataStore + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_key_without_device_name(self): + now = 1470174257070 + json = '{ "key": "value" }' + + yield self.store.set_e2e_device_keys( + "user", "device", now, json) + + res = yield self.store.get_e2e_device_keys((("user", "device"),)) + self.assertIn("user", res) + self.assertIn("device", res["user"]) + dev = res["user"]["device"] + self.assertDictContainsSubset({ + "key_json": json, + "device_display_name": None, + }, dev) + + @defer.inlineCallbacks + def test_get_key_with_device_name(self): + now = 1470174257070 + json = '{ "key": "value" }' + + yield self.store.set_e2e_device_keys( + "user", "device", now, json) + yield self.store.store_device( + "user", "device", "display_name" + ) + + res = yield self.store.get_e2e_device_keys((("user", "device"),)) + self.assertIn("user", res) + self.assertIn("device", res["user"]) + dev = res["user"]["device"] + self.assertDictContainsSubset({ + "key_json": json, + "device_display_name": "display_name", + }, dev) + + + @defer.inlineCallbacks + def test_multiple_devices(self): + now = 1470174257070 + + yield self.store.set_e2e_device_keys( + "user1", "device1", now, 'json11') + yield self.store.set_e2e_device_keys( + "user1", "device2", now, 'json12') + yield self.store.set_e2e_device_keys( + "user2", "device1", now, 'json21') + yield self.store.set_e2e_device_keys( + "user2", "device2", now, 'json22') + + res = yield self.store.get_e2e_device_keys((("user1", "device1"), + ("user2", "device2"))) + self.assertIn("user1", res) + self.assertIn("device1", res["user1"]) + self.assertNotIn("device2", res["user1"]) + self.assertIn("user2", res) + self.assertNotIn("device1", res["user2"]) + self.assertIn("device2", res["user2"]) From 98385888b891f544c68b749746604b593f98d729 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Aug 2016 14:57:46 +0100 Subject: [PATCH 16/24] PEP8 --- synapse/storage/end_to_end_keys.py | 20 ++++++++++---------- tests/storage/test_end_to_end_keys.py | 2 -- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 5c8ed3e49..385d60705 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -63,16 +63,16 @@ class EndToEndKeyStore(SQLBaseStore): query_clauses.append(query_clause) sql = ( - "SELECT k.user_id, k.device_id, " - " d.display_name AS device_display_name, " - " k.key_json" - " FROM e2e_device_keys_json k" - " LEFT JOIN devices d ON d.user_id = k.user_id" - " AND d.device_id = k.device_id" - " WHERE %s" - ) % ( - " OR ".join("("+q+")" for q in query_clauses) - ) + "SELECT k.user_id, k.device_id, " + " d.display_name AS device_display_name, " + " k.key_json" + " FROM e2e_device_keys_json k" + " LEFT JOIN devices d ON d.user_id = k.user_id" + " AND d.device_id = k.device_id" + " WHERE %s" + ) % ( + " OR ".join("(" + q + ")" for q in query_clauses) + ) txn.execute(sql, query_params) rows = self.cursor_to_dict(txn) diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py index 0ebc6dafe..453bc6143 100644 --- a/tests/storage/test_end_to_end_keys.py +++ b/tests/storage/test_end_to_end_keys.py @@ -15,7 +15,6 @@ from twisted.internet import defer -import synapse.api.errors import tests.unittest import tests.utils @@ -68,7 +67,6 @@ class EndToEndKeyStoreTestCase(tests.unittest.TestCase): "device_display_name": "display_name", }, dev) - @defer.inlineCallbacks def test_multiple_devices(self): now = 1470174257070 From bcc9cda8ca75b5cc381ce10ba9b8e4af56c6bdaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:17:26 +0100 Subject: [PATCH 17/24] Fix copy + paste fails --- synapse/federation/federation_client.py | 15 ++++++++++----- synapse/federation/federation_server.py | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7eadcdd28..dde10967b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,8 +411,13 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - for i in xrange(0, len(missing_events), batch_size): - batch = missing_events[i:i + batch_size] + while missing_events: + batch = [] + try: + for _ in range(0, batch_size): + batch.append(missing_events.pop()) + except KeyError: + pass deferreds = [ self.get_pdu( @@ -423,9 +428,9 @@ class FederationClient(FederationBase): ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for (result, val), (e_id, _) in res: - if result and val: - signed_events.append(val) + for success, (result, e_id) in res: + if success and result: + signed_events.append(result) else: failed_to_fetch.add(e_id) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 40e9fda0e..35a01eecc 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -623,7 +623,7 @@ class FederationServer(FederationBase): origin, pdu.room_id, pdu.event_id, ) except: - logger.warn("Failed to get state for event: %s", pdu.event_id) + logger.exception("Failed to get state for event: %s", pdu.event_id) yield self.handler.on_receive_pdu( origin, From edb33eb163b6c60bfd2c3cab78a6bd13a47b6702 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Aug 2016 17:19:15 +0100 Subject: [PATCH 18/24] Rename fields to _ids --- synapse/federation/federation_client.py | 4 ++-- synapse/federation/federation_server.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dde10967b..264f3c0ae 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -322,8 +322,8 @@ class FederationClient(FederationBase): destination, room_id, event_id=event_id, ) - state_event_ids = result["pdus"] - auth_event_ids = result.get("auth_chain", []) + state_event_ids = result["pdu_ids"] + auth_event_ids = result.get("auth_chain_ids", []) fetched_events, failed_to_fetch = yield self.get_events( [destination], room_id, set(state_event_ids + auth_event_ids) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 35a01eecc..2b91f93e0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -231,8 +231,8 @@ class FederationServer(FederationBase): ) defer.returnValue((200, { - "pdus": [pdu.event_id for pdu in pdus], - "auth_chain": [pdu.event_id for pdu in auth_chain], + "pdu_ids": [pdu.event_id for pdu in pdus], + "auth_chain_ids": [pdu.event_id for pdu in auth_chain], })) @defer.inlineCallbacks From f131cd9e53b8131c5f7fa71765c717eadd001f16 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 4 Aug 2016 10:59:51 +0100 Subject: [PATCH 19/24] keys/query: Omit device displayname if null ... which makes it more consistent with user displaynames. --- synapse/handlers/e2e_keys.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index bb69089b9..2c7bfd91e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -122,9 +122,10 @@ class E2eKeysHandler(object): for user_id, device_keys in results.items(): for device_id, device_info in device_keys.items(): r = json.loads(device_info["key_json"]) - r["unsigned"] = { - "device_display_name": device_info["device_display_name"], - } + r["unsigned"] = {} + display_name = device_info["device_display_name"] + if display_name is not None: + r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r defer.returnValue(result_dict) From b0a14bf53e41484ae2adc3c036ec50f2332d2eca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 11:28:02 +0100 Subject: [PATCH 20/24] Handle the fact that some tables have negative rowid rows --- scripts/synapse_port_db | 154 +++++++++++++++++++++++++++++----------- 1 file changed, 111 insertions(+), 43 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index efd04da2d..69503cedb 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -34,7 +34,7 @@ logger = logging.getLogger("synapse_port_db") BOOLEAN_COLUMNS = { - "events": ["processed", "outlier"], + "events": ["processed", "outlier", "contains_url"], "rooms": ["is_public"], "event_edges": ["is_state"], "presence_list": ["accepted"], @@ -92,8 +92,12 @@ class Store(object): _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"] _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"] + _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"] + _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"] _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"] - _simple_select_one_onecol_txn = SQLBaseStore.__dict__["_simple_select_one_onecol_txn"] + _simple_select_one_onecol_txn = SQLBaseStore.__dict__[ + "_simple_select_one_onecol_txn" + ] _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"] _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"] @@ -158,31 +162,40 @@ class Porter(object): def setup_table(self, table): if table in APPEND_ONLY_TABLES: # It's safe to just carry on inserting. - next_chunk = yield self.postgres_store._simple_select_one_onecol( + row = yield self.postgres_store._simple_select_one( table="port_from_sqlite3", keyvalues={"table_name": table}, - retcol="rowid", + retcols=("forward_rowid", "backward_rowid"), allow_none=True, ) total_to_port = None - if next_chunk is None: + if row is None: if table == "sent_transactions": - next_chunk, already_ported, total_to_port = ( + forward_chunk, already_ported, total_to_port = ( yield self._setup_sent_transactions() ) + backward_chunk = 0 else: yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={"table_name": table, "rowid": 1} + values={ + "table_name": table, + "forward_rowid": 1, + "backward_rowid": 0, + } ) - next_chunk = 1 + forward_chunk = 1 + backward_chunk = 0 already_ported = 0 + else: + forward_chunk = row["forward_rowid"] + backward_chunk = row["backward_rowid"] if total_to_port is None: already_ported, total_to_port = yield self._get_total_count_to_port( - table, next_chunk + table, forward_chunk, backward_chunk ) else: def delete_all(txn): @@ -196,46 +209,85 @@ class Porter(object): yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={"table_name": table, "rowid": 0} + values={ + "table_name": table, + "forward_rowid": 1, + "backward_rowid": 0, + } ) - next_chunk = 1 + forward_chunk = 1 + backward_chunk = 0 already_ported, total_to_port = yield self._get_total_count_to_port( - table, next_chunk + table, forward_chunk, backward_chunk ) - defer.returnValue((table, already_ported, total_to_port, next_chunk)) + defer.returnValue( + (table, already_ported, total_to_port, forward_chunk, backward_chunk) + ) @defer.inlineCallbacks - def handle_table(self, table, postgres_size, table_size, next_chunk): + def handle_table(self, table, postgres_size, table_size, forward_chunk, + backward_chunk): if not table_size: return self.progress.add_table(table, postgres_size, table_size) if table == "event_search": - yield self.handle_search_table(postgres_size, table_size, next_chunk) + yield self.handle_search_table( + postgres_size, table_size, forward_chunk, backward_chunk + ) return - select = ( + forward_select = ( "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) ) + backward_select = ( + "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" + % (table,) + ) + + do_forward = [True] + do_backward = [True] + while True: def r(txn): - txn.execute(select, (next_chunk, self.batch_size,)) - rows = txn.fetchall() - headers = [column[0] for column in txn.description] + forward_rows = [] + backward_rows = [] + if do_forward[0]: + txn.execute(forward_select, (forward_chunk, self.batch_size,)) + forward_rows = txn.fetchall() + if not forward_rows: + do_forward[0] = False + + if do_backward[0]: + txn.execute(backward_select, (backward_chunk, self.batch_size,)) + backward_rows = txn.fetchall() + if not backward_rows: + do_backward[0] = False + + if forward_rows or backward_rows: + headers = [column[0] for column in txn.description] + else: + headers = None - return headers, rows + return headers, forward_rows, backward_rows - headers, rows = yield self.sqlite_store.runInteraction("select", r) + headers, frows, brows = yield self.sqlite_store.runInteraction( + "select", r + ) - if rows: - next_chunk = rows[-1][0] + 1 + if frows or brows: + if frows: + forward_chunk = max(row[0] for row in frows) + 1 + if brows: + backward_chunk = min(row[0] for row in brows) - 1 + rows = frows + brows self._convert_rows(table, headers, rows) def insert(txn): @@ -247,7 +299,10 @@ class Porter(object): txn, table="port_from_sqlite3", keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, + updatevalues={ + "forward_rowid": forward_chunk, + "backward_rowid": backward_chunk, + }, ) yield self.postgres_store.execute(insert) @@ -259,7 +314,8 @@ class Porter(object): return @defer.inlineCallbacks - def handle_search_table(self, postgres_size, table_size, next_chunk): + def handle_search_table(self, postgres_size, table_size, forward_chunk, + backward_chunk): select = ( "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering" " FROM event_search as es" @@ -270,7 +326,7 @@ class Porter(object): while True: def r(txn): - txn.execute(select, (next_chunk, self.batch_size,)) + txn.execute(select, (forward_chunk, self.batch_size,)) rows = txn.fetchall() headers = [column[0] for column in txn.description] @@ -279,7 +335,7 @@ class Porter(object): headers, rows = yield self.sqlite_store.runInteraction("select", r) if rows: - next_chunk = rows[-1][0] + 1 + forward_chunk = rows[-1][0] + 1 # We have to treat event_search differently since it has a # different structure in the two different databases. @@ -312,7 +368,10 @@ class Porter(object): txn, table="port_from_sqlite3", keyvalues={"table_name": "event_search"}, - updatevalues={"rowid": next_chunk}, + updatevalues={ + "forward_rowid": forward_chunk, + "backward_rowid": backward_chunk, + }, ) yield self.postgres_store.execute(insert) @@ -324,7 +383,6 @@ class Porter(object): else: return - def setup_db(self, db_config, database_engine): db_conn = database_engine.module.connect( **{ @@ -395,7 +453,8 @@ class Porter(object): txn.execute( "CREATE TABLE port_from_sqlite3 (" " table_name varchar(100) NOT NULL UNIQUE," - " rowid bigint NOT NULL" + " forward_rowid bigint NOT NULL," + " backward_rowid bigint NOT NULL" ")" ) @@ -458,7 +517,7 @@ class Porter(object): @defer.inlineCallbacks def _setup_sent_transactions(self): # Only save things from the last day - yesterday = int(time.time()*1000) - 86400000 + yesterday = int(time.time() * 1000) - 86400000 # And save the max transaction id from each destination select = ( @@ -514,7 +573,11 @@ class Porter(object): yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={"table_name": "sent_transactions", "rowid": next_chunk} + values={ + "table_name": "sent_transactions", + "forward_rowid": next_chunk, + "backward_rowid": 0, + } ) def get_sent_table_size(txn): @@ -535,13 +598,18 @@ class Porter(object): defer.returnValue((next_chunk, inserted_rows, total_count)) @defer.inlineCallbacks - def _get_remaining_count_to_port(self, table, next_chunk): - rows = yield self.sqlite_store.execute_sql( + def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk): + frows = yield self.sqlite_store.execute_sql( "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), - next_chunk, + forward_chunk, ) - defer.returnValue(rows[0][0]) + brows = yield self.sqlite_store.execute_sql( + "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), + backward_chunk, + ) + + defer.returnValue(frows[0][0] + brows[0][0]) @defer.inlineCallbacks def _get_already_ported_count(self, table): @@ -552,10 +620,10 @@ class Porter(object): defer.returnValue(rows[0][0]) @defer.inlineCallbacks - def _get_total_count_to_port(self, table, next_chunk): + def _get_total_count_to_port(self, table, forward_chunk, backward_chunk): remaining, done = yield defer.gatherResults( [ - self._get_remaining_count_to_port(table, next_chunk), + self._get_remaining_count_to_port(table, forward_chunk, backward_chunk), self._get_already_ported_count(table), ], consumeErrors=True, @@ -686,7 +754,7 @@ class CursesProgress(Progress): color = curses.color_pair(2) if perc == 100 else curses.color_pair(1) self.stdscr.addstr( - i+2, left_margin + max_len - len(table), + i + 2, left_margin + max_len - len(table), table, curses.A_BOLD | color, ) @@ -694,18 +762,18 @@ class CursesProgress(Progress): size = 20 progress = "[%s%s]" % ( - "#" * int(perc*size/100), - " " * (size - int(perc*size/100)), + "#" * int(perc * size / 100), + " " * (size - int(perc * size / 100)), ) self.stdscr.addstr( - i+2, left_margin + max_len + middle_space, + i + 2, left_margin + max_len + middle_space, "%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]), ) if self.finished: self.stdscr.addstr( - rows-1, 0, + rows - 1, 0, "Press any key to exit...", ) From 7c7786d4e1ead0e1ea9bfd20b18bdbdcff806fb5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 11:35:49 +0100 Subject: [PATCH 21/24] Allow upgrading from old port_from_sqlite3 format --- scripts/synapse_port_db | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 69503cedb..66c61b019 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -458,6 +458,27 @@ class Porter(object): ")" ) + # The old port script created a table with just a "rowid" column. + # We want people to be able to rerun this script from an old port + # so that they can pick up any missing events that were not + # ported across. + def alter_table(txn): + txn.execute( + "ALTER TABLE IF EXISTS port_from_sqlite3" + " RENAME rowid TO forward_rowid" + ) + txn.execute( + "ALTER TABLE IF EXISTS port_from_sqlite3" + " ADD backward_rowid bigint NOT NULL DEFAULT 0" + ) + + try: + yield self.postgres_store.runInteraction( + "alter_table", alter_table + ) + except Exception as e: + logger.info("Failed to create port table: %s", e) + try: yield self.postgres_store.runInteraction( "create_port_table", create_port_table From 1fc50712d6729c85680e8ebe1c8612c304217c47 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 4 Aug 2016 12:31:55 +0100 Subject: [PATCH 22/24] Factor out more common code from the jenkins scripts (#980) * Factor out more common code from the jenkins scripts * Fix install_and_run path * Poke jenkins * Poke jenkins --- jenkins-dendron-postgres.sh | 63 ++++++------------------------------- jenkins-postgres.sh | 44 +++----------------------- jenkins-sqlite.sh | 39 ++--------------------- jenkins/clone.sh | 36 ++++++++++++++++----- 4 files changed, 44 insertions(+), 138 deletions(-) diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index e6e94cc8b..68912a896 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -4,62 +4,19 @@ set -eux : ${WORKSPACE:="$(pwd)"} +export WORKSPACE export PYTHONDONTWRITEBYTECODE=yep export SYNAPSE_CACHE_FACTOR=1 -# Output test results as junit xml -export TRIAL_FLAGS="--reporter=subunit" -export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" -# Write coverage reports to a separate file for each process -export COVERAGE_OPTS="-p" -export DUMP_COVERAGE_COMMAND="coverage help" - -# Output flake8 violations to violations.flake8.log -# Don't exit with non-0 status code on Jenkins, -# so that the build steps continue and a later step can decided whether to -# UNSTABLE or FAILURE this build. -export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" - -rm .coverage* || echo "No coverage files to remove" - ./jenkins/prepare_synapse.sh - ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git ./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git - -: ${GOPATH:=${WORKSPACE}/.gopath} -if [[ "${GOPATH}" != *:* ]]; then - mkdir -p "${GOPATH}" - export PATH="${GOPATH}/bin:${PATH}" -fi -export GOPATH - -cd dendron - -go get github.com/constabulary/gb/... -gb generate -gb build - -cd ../sytest - -: ${PORT_BASE:=20000} -: ${PORT_COUNT=100} -export PORT_BASE -export PORT_COUNT - -./jenkins/prep_sytest_for_postgres.sh - -mkdir -p var - -echo >&2 "Running sytest with PostgreSQL"; - -TOX_BIN=$WORKSPACE/.tox/py27/bin -./jenkins/install_and_run.sh --python $TOX_BIN/python \ - --synapse-directory $WORKSPACE \ - --dendron $WORKSPACE/dendron/bin/dendron \ - --pusher \ - --synchrotron \ - --federation-reader \ - --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) - -cd .. +./dendron/jenkins/build_dendron.sh +./sytest/jenkins/prep_sytest_for_postgres.sh + +./sytest/jenkins/install_and_run.sh \ + --synapse-directory $WORKSPACE \ + --dendron $WORKSPACE/dendron/bin/dendron \ + --pusher \ + --synchrotron \ + --federation-reader \ diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index edf61a45b..f2ca8ccdf 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -4,50 +4,14 @@ set -eux : ${WORKSPACE:="$(pwd)"} +export WORKSPACE export PYTHONDONTWRITEBYTECODE=yep export SYNAPSE_CACHE_FACTOR=1 -# Output test results as junit xml -export TRIAL_FLAGS="--reporter=subunit" -export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" -# Write coverage reports to a separate file for each process -export COVERAGE_OPTS="-p" -export DUMP_COVERAGE_COMMAND="coverage help" - -# Output flake8 violations to violations.flake8.log -# Don't exit with non-0 status code on Jenkins, -# so that the build steps continue and a later step can decided whether to -# UNSTABLE or FAILURE this build. -export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" - -rm .coverage* || echo "No coverage files to remove" - ./jenkins/prepare_synapse.sh - ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git -: ${PORT_BASE:=20000} -: ${PORT_COUNT=100} -export PORT_BASE -export PORT_COUNT - -cd sytest - -./jenkins/prep_sytest_for_postgres.sh - -echo >&2 "Running sytest with PostgreSQL"; - -TOX_BIN=$WORKSPACE/.tox/py27/bin -./jenkins/install_and_run.sh --coverage \ - --python $TOX_BIN/python \ - --synapse-directory $WORKSPACE \ - --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) - -cd .. -cp sytest/.coverage.* . +./sytest/jenkins/prep_sytest_for_postgres.sh -# Combine the coverage reports -echo "Combining:" .coverage.* -$TOX_BIN/python -m coverage combine -# Output coverage to coverage.xml -$TOX_BIN/coverage xml -o coverage.xml +./sytest/jenkins/install_and_run.sh \ + --synapse-directory $WORKSPACE \ diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index 1c3530ebb..84613d979 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -8,43 +8,8 @@ export WORKSPACE export PYTHONDONTWRITEBYTECODE=yep export SYNAPSE_CACHE_FACTOR=1 -# Output test results as junit xml -export TRIAL_FLAGS="--reporter=subunit" -export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" -# Write coverage reports to a separate file for each process -export COVERAGE_OPTS="-p" -export DUMP_COVERAGE_COMMAND="coverage help" - -# Output flake8 violations to violations.flake8.log -# Don't exit with non-0 status code on Jenkins, -# so that the build steps continue and a later step can decided whether to -# UNSTABLE or FAILURE this build. -export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" - -rm .coverage* || echo "No coverage files to remove" - ./jenkins/prepare_synapse.sh - ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git -: ${PORT_BASE:=20000} -: ${PORT_COUNT=100} -export PORT_BASE -export PORT_COUNT - -cd sytest - -TOX_BIN=$WORKSPACE/.tox/py27/bin -./jenkins/install_and_run.sh --coverage \ - --python $TOX_BIN/python \ - --synapse-directory $WORKSPACE \ - --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) \ - -cd .. -cp sytest/.coverage.* . - -# Combine the coverage reports -echo "Combining:" .coverage.* -$TOX_BIN/python -m coverage combine -# Output coverage to coverage.xml -$TOX_BIN/coverage xml -o coverage.xml +./sytest/jenkins/install_and_run.sh \ + --synapse-directory $WORKSPACE \ diff --git a/jenkins/clone.sh b/jenkins/clone.sh index f56d076ea..ab30ac778 100755 --- a/jenkins/clone.sh +++ b/jenkins/clone.sh @@ -1,24 +1,44 @@ #! /bin/bash +# This clones a project from github into a named subdirectory +# If the project has a branch with the same name as this branch +# then it will checkout that branch after cloning. +# Otherwise it will checkout "origin/develop." +# The first argument is the name of the directory to checkout +# the branch into. +# The second argument is the URL of the remote repository to checkout. +# Usually something like https://github.com/matrix-org/sytest.git + +set -eux + NAME=$1 PROJECT=$2 BASE=".$NAME-base" -# update our clone -if [ ! -d .$NAME-base ]; then - git clone $PROJECT $BASE --mirror +# Update our mirror. +if [ ! -d ".$NAME-base" ]; then + # Create a local mirror of the source repository. + # This saves us from having to download the entire repository + # when this script is next run. + git clone "$PROJECT" "$BASE" --mirror else - (cd $BASE; git fetch -p) + # Fetch any updates from the source repository. + (cd "$BASE"; git fetch -p) fi -rm -rf $NAME -git clone $BASE $NAME --shared +# Remove the existing repository so that we have a clean copy +rm -rf "$NAME" +# Cloning with --shared means that we will share portions of the +# .git directory with our local mirror. +git clone "$BASE" "$NAME" --shared +# Jenkins may have supplied us with the name of the branch in the +# environment. Otherwise we will have to guess based on the current +# commit. : ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} -cd $NAME +cd "$NAME" # check out the relevant branch git checkout "${GIT_BRANCH}" || ( echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" git checkout "origin/develop" ) -git clean -df . From e3ee63578f335037c73675209bb7861045c2027a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:01:18 +0100 Subject: [PATCH 23/24] Tidy up get_events --- synapse/federation/federation_client.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 264f3c0ae..ae0d65070 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -411,28 +411,26 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - while missing_events: - batch = [] - try: - for _ in range(0, batch_size): - batch.append(missing_events.pop()) - except KeyError: - pass + missing_events = len(missing_events) + for i in xrange(0, batch_size, batch_size): + batch = set(missing_events[i:i + batch_size]) deferreds = [ self.get_pdu( destinations=random_server_list(), event_id=e_id, - ).addBoth(lambda r, e: (r, e), e_id) + ) for e_id in batch ] res = yield defer.DeferredList(deferreds, consumeErrors=True) - for success, (result, e_id) in res: - if success and result: + for success, result in res: + if success: signed_events.append(result) - else: - failed_to_fetch.add(e_id) + batch.discard(result.event_id) + + # We removed all events we successfully fetched from `batch` + failed_to_fetch.update(batch) defer.returnValue((signed_events, failed_to_fetch)) From 257c41cc2e7163c42b8bcfa3a29d42ae5ac087b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Aug 2016 14:05:45 +0100 Subject: [PATCH 24/24] Fix typos. --- synapse/federation/federation_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ae0d65070..c6ed72016 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -396,7 +396,7 @@ class FederationClient(FederationBase): seen_events = yield self.store.have_events(event_ids) signed_events = [] - failed_to_fetch = [] + failed_to_fetch = set() missing_events = set(event_ids) for k in seen_events: @@ -411,8 +411,8 @@ class FederationClient(FederationBase): return srvs batch_size = 20 - missing_events = len(missing_events) - for i in xrange(0, batch_size, batch_size): + missing_events = list(missing_events) + for i in xrange(0, len(missing_events), batch_size): batch = set(missing_events[i:i + batch_size]) deferreds = [