mirror of https://github.com/watcha-fr/synapse
Merge pull request #106 from matrix-org/application-services-txn-reliability
Application services transaction reliability (PR #106)pull/4/merge
commit
5b999e206e
@ -0,0 +1,254 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2015 OpenMarket Ltd |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
""" |
||||
This module controls the reliability for application service transactions. |
||||
|
||||
The nominal flow through this module looks like: |
||||
__________ |
||||
1---ASa[e]-->| Service |--> Queue ASa[f] |
||||
2----ASb[e]->| Queuer | |
||||
3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e] |
||||
V |
||||
-````````- +------------+ |
||||
|````````|<--StoreTxn-|Transaction | |
||||
|Database| | Controller |---> SEND TO AS |
||||
`--------` +------------+ |
||||
What happens on SEND TO AS depends on the state of the Application Service: |
||||
- If the AS is marked as DOWN, do nothing. |
||||
- If the AS is marked as UP, send the transaction. |
||||
* SUCCESS : Increment where the AS is up to txn-wise and nuke the txn |
||||
contents from the db. |
||||
* FAILURE : Marked AS as DOWN and start Recoverer. |
||||
|
||||
Recoverer attempts to recover ASes who have died. The flow for this looks like: |
||||
,--------------------- backoff++ --------------. |
||||
V | |
||||
START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE |
||||
backoff DB and try to send it |
||||
^ |___________ |
||||
Mark AS as | V |
||||
UP & quit +---------- YES SUCCESS |
||||
| | | |
||||
NO <--- Have more txns? <------ Mark txn success & nuke <-+ |
||||
from db; incr AS pos. |
||||
Reset backoff. |
||||
|
||||
This is all tied together by the AppServiceScheduler which DIs the required |
||||
components. |
||||
""" |
||||
|
||||
from synapse.appservice import ApplicationServiceState |
||||
from twisted.internet import defer |
||||
import logging |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
|
||||
class AppServiceScheduler(object): |
||||
""" Public facing API for this module. Does the required DI to tie the |
||||
components together. This also serves as the "event_pool", which in this |
||||
case is a simple array. |
||||
""" |
||||
|
||||
def __init__(self, clock, store, as_api): |
||||
self.clock = clock |
||||
self.store = store |
||||
self.as_api = as_api |
||||
|
||||
def create_recoverer(service, callback): |
||||
return _Recoverer(clock, store, as_api, service, callback) |
||||
|
||||
self.txn_ctrl = _TransactionController( |
||||
clock, store, as_api, create_recoverer |
||||
) |
||||
self.queuer = _ServiceQueuer(self.txn_ctrl) |
||||
|
||||
@defer.inlineCallbacks |
||||
def start(self): |
||||
logger.info("Starting appservice scheduler") |
||||
# check for any DOWN ASes and start recoverers for them. |
||||
recoverers = yield _Recoverer.start( |
||||
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered |
||||
) |
||||
self.txn_ctrl.add_recoverers(recoverers) |
||||
|
||||
def submit_event_for_as(self, service, event): |
||||
self.queuer.enqueue(service, event) |
||||
|
||||
|
||||
class _ServiceQueuer(object): |
||||
"""Queues events for the same application service together, sending |
||||
transactions as soon as possible. Once a transaction is sent successfully, |
||||
this schedules any other events in the queue to run. |
||||
""" |
||||
|
||||
def __init__(self, txn_ctrl): |
||||
self.queued_events = {} # dict of {service_id: [events]} |
||||
self.pending_requests = {} # dict of {service_id: Deferred} |
||||
self.txn_ctrl = txn_ctrl |
||||
|
||||
def enqueue(self, service, event): |
||||
# if this service isn't being sent something |
||||
if not self.pending_requests.get(service.id): |
||||
self._send_request(service, [event]) |
||||
else: |
||||
# add to queue for this service |
||||
if service.id not in self.queued_events: |
||||
self.queued_events[service.id] = [] |
||||
self.queued_events[service.id].append(event) |
||||
|
||||
def _send_request(self, service, events): |
||||
# send request and add callbacks |
||||
d = self.txn_ctrl.send(service, events) |
||||
d.addBoth(self._on_request_finish) |
||||
d.addErrback(self._on_request_fail) |
||||
self.pending_requests[service.id] = d |
||||
|
||||
def _on_request_finish(self, service): |
||||
self.pending_requests[service.id] = None |
||||
# if there are queued events, then send them. |
||||
if (service.id in self.queued_events |
||||
and len(self.queued_events[service.id]) > 0): |
||||
self._send_request(service, self.queued_events[service.id]) |
||||
self.queued_events[service.id] = [] |
||||
|
||||
def _on_request_fail(self, err): |
||||
logger.error("AS request failed: %s", err) |
||||
|
||||
|
||||
class _TransactionController(object): |
||||
|
||||
def __init__(self, clock, store, as_api, recoverer_fn): |
||||
self.clock = clock |
||||
self.store = store |
||||
self.as_api = as_api |
||||
self.recoverer_fn = recoverer_fn |
||||
# keep track of how many recoverers there are |
||||
self.recoverers = [] |
||||
|
||||
@defer.inlineCallbacks |
||||
def send(self, service, events): |
||||
try: |
||||
txn = yield self.store.create_appservice_txn( |
||||
service=service, |
||||
events=events |
||||
) |
||||
service_is_up = yield self._is_service_up(service) |
||||
if service_is_up: |
||||
sent = yield txn.send(self.as_api) |
||||
if sent: |
||||
txn.complete(self.store) |
||||
else: |
||||
self._start_recoverer(service) |
||||
except Exception as e: |
||||
logger.exception(e) |
||||
self._start_recoverer(service) |
||||
# request has finished |
||||
defer.returnValue(service) |
||||
|
||||
@defer.inlineCallbacks |
||||
def on_recovered(self, recoverer): |
||||
self.recoverers.remove(recoverer) |
||||
logger.info("Successfully recovered application service AS ID %s", |
||||
recoverer.service.id) |
||||
logger.info("Remaining active recoverers: %s", len(self.recoverers)) |
||||
yield self.store.set_appservice_state( |
||||
recoverer.service, |
||||
ApplicationServiceState.UP |
||||
) |
||||
|
||||
def add_recoverers(self, recoverers): |
||||
for r in recoverers: |
||||
self.recoverers.append(r) |
||||
if len(recoverers) > 0: |
||||
logger.info("New active recoverers: %s", len(self.recoverers)) |
||||
|
||||
@defer.inlineCallbacks |
||||
def _start_recoverer(self, service): |
||||
yield self.store.set_appservice_state( |
||||
service, |
||||
ApplicationServiceState.DOWN |
||||
) |
||||
logger.info( |
||||
"Application service falling behind. Starting recoverer. AS ID %s", |
||||
service.id |
||||
) |
||||
recoverer = self.recoverer_fn(service, self.on_recovered) |
||||
self.add_recoverers([recoverer]) |
||||
recoverer.recover() |
||||
|
||||
@defer.inlineCallbacks |
||||
def _is_service_up(self, service): |
||||
state = yield self.store.get_appservice_state(service) |
||||
defer.returnValue(state == ApplicationServiceState.UP or state is None) |
||||
|
||||
|
||||
class _Recoverer(object): |
||||
|
||||
@staticmethod |
||||
@defer.inlineCallbacks |
||||
def start(clock, store, as_api, callback): |
||||
services = yield store.get_appservices_by_state( |
||||
ApplicationServiceState.DOWN |
||||
) |
||||
recoverers = [ |
||||
_Recoverer(clock, store, as_api, s, callback) for s in services |
||||
] |
||||
for r in recoverers: |
||||
logger.info("Starting recoverer for AS ID %s which was marked as " |
||||
"DOWN", r.service.id) |
||||
r.recover() |
||||
defer.returnValue(recoverers) |
||||
|
||||
def __init__(self, clock, store, as_api, service, callback): |
||||
self.clock = clock |
||||
self.store = store |
||||
self.as_api = as_api |
||||
self.service = service |
||||
self.callback = callback |
||||
self.backoff_counter = 1 |
||||
|
||||
def recover(self): |
||||
self.clock.call_later((2 ** self.backoff_counter), self.retry) |
||||
|
||||
def _backoff(self): |
||||
# cap the backoff to be around 18h => (2^16) = 65536 secs |
||||
if self.backoff_counter < 16: |
||||
self.backoff_counter += 1 |
||||
self.recover() |
||||
|
||||
@defer.inlineCallbacks |
||||
def retry(self): |
||||
try: |
||||
txn = yield self.store.get_oldest_unsent_txn(self.service) |
||||
if txn: |
||||
logger.info("Retrying transaction %s for AS ID %s", |
||||
txn.id, txn.service.id) |
||||
sent = yield txn.send(self.as_api) |
||||
if sent: |
||||
yield txn.complete(self.store) |
||||
# reset the backoff counter and retry immediately |
||||
self.backoff_counter = 1 |
||||
yield self.retry() |
||||
else: |
||||
self._backoff() |
||||
else: |
||||
self._set_service_recovered() |
||||
except Exception as e: |
||||
logger.exception(e) |
||||
self._backoff() |
||||
|
||||
def _set_service_recovered(self): |
||||
self.callback(self) |
@ -0,0 +1,31 @@ |
||||
/* Copyright 2015 OpenMarket Ltd |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
CREATE TABLE IF NOT EXISTS application_services_state( |
||||
as_id INTEGER PRIMARY KEY, |
||||
state TEXT, |
||||
last_txn TEXT, |
||||
FOREIGN KEY(as_id) REFERENCES application_services(id) |
||||
); |
||||
|
||||
CREATE TABLE IF NOT EXISTS application_services_txns( |
||||
as_id INTEGER NOT NULL, |
||||
txn_id INTEGER NOT NULL, |
||||
event_ids TEXT NOT NULL, |
||||
UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK |
||||
); |
||||
|
||||
|
||||
|
@ -0,0 +1,252 @@ |
||||
# -*- coding: utf-8 -*- |
||||
# Copyright 2015 OpenMarket Ltd |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
from synapse.appservice import ApplicationServiceState, AppServiceTransaction |
||||
from synapse.appservice.scheduler import ( |
||||
_ServiceQueuer, _TransactionController, _Recoverer |
||||
) |
||||
from twisted.internet import defer |
||||
from ..utils import MockClock |
||||
from mock import Mock |
||||
from tests import unittest |
||||
|
||||
|
||||
class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): |
||||
|
||||
def setUp(self): |
||||
self.clock = MockClock() |
||||
self.store = Mock() |
||||
self.as_api = Mock() |
||||
self.recoverer = Mock() |
||||
self.recoverer_fn = Mock(return_value=self.recoverer) |
||||
self.txnctrl = _TransactionController( |
||||
clock=self.clock, store=self.store, as_api=self.as_api, |
||||
recoverer_fn=self.recoverer_fn |
||||
) |
||||
|
||||
def test_single_service_up_txn_sent(self): |
||||
# Test: The AS is up and the txn is successfully sent. |
||||
service = Mock() |
||||
events = [Mock(), Mock()] |
||||
txn_id = "foobar" |
||||
txn = Mock(id=txn_id, service=service, events=events) |
||||
|
||||
# mock methods |
||||
self.store.get_appservice_state = Mock( |
||||
return_value=defer.succeed(ApplicationServiceState.UP) |
||||
) |
||||
txn.send = Mock(return_value=defer.succeed(True)) |
||||
self.store.create_appservice_txn = Mock( |
||||
return_value=defer.succeed(txn) |
||||
) |
||||
|
||||
# actual call |
||||
self.txnctrl.send(service, events) |
||||
|
||||
self.store.create_appservice_txn.assert_called_once_with( |
||||
service=service, events=events # txn made and saved |
||||
) |
||||
self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made |
||||
txn.complete.assert_called_once_with(self.store) # txn completed |
||||
|
||||
def test_single_service_down(self): |
||||
# Test: The AS is down so it shouldn't push; Recoverers will do it. |
||||
# It should still make a transaction though. |
||||
service = Mock() |
||||
events = [Mock(), Mock()] |
||||
|
||||
txn = Mock(id="idhere", service=service, events=events) |
||||
self.store.get_appservice_state = Mock( |
||||
return_value=defer.succeed(ApplicationServiceState.DOWN) |
||||
) |
||||
self.store.create_appservice_txn = Mock( |
||||
return_value=defer.succeed(txn) |
||||
) |
||||
|
||||
# actual call |
||||
self.txnctrl.send(service, events) |
||||
|
||||
self.store.create_appservice_txn.assert_called_once_with( |
||||
service=service, events=events # txn made and saved |
||||
) |
||||
self.assertEquals(0, txn.send.call_count) # txn not sent though |
||||
self.assertEquals(0, txn.complete.call_count) # or completed |
||||
|
||||
def test_single_service_up_txn_not_sent(self): |
||||
# Test: The AS is up and the txn is not sent. A Recoverer is made and |
||||
# started. |
||||
service = Mock() |
||||
events = [Mock(), Mock()] |
||||
txn_id = "foobar" |
||||
txn = Mock(id=txn_id, service=service, events=events) |
||||
|
||||
# mock methods |
||||
self.store.get_appservice_state = Mock( |
||||
return_value=defer.succeed(ApplicationServiceState.UP) |
||||
) |
||||
self.store.set_appservice_state = Mock(return_value=defer.succeed(True)) |
||||
txn.send = Mock(return_value=defer.succeed(False)) # fails to send |
||||
self.store.create_appservice_txn = Mock( |
||||
return_value=defer.succeed(txn) |
||||
) |
||||
|
||||
# actual call |
||||
self.txnctrl.send(service, events) |
||||
|
||||
self.store.create_appservice_txn.assert_called_once_with( |
||||
service=service, events=events |
||||
) |
||||
self.assertEquals(1, self.recoverer_fn.call_count) # recoverer made |
||||
self.assertEquals(1, self.recoverer.recover.call_count) # and invoked |
||||
self.assertEquals(1, len(self.txnctrl.recoverers)) # and stored |
||||
self.assertEquals(0, txn.complete.call_count) # txn not completed |
||||
self.store.set_appservice_state.assert_called_once_with( |
||||
service, ApplicationServiceState.DOWN # service marked as down |
||||
) |
||||
|
||||
|
||||
class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): |
||||
|
||||
def setUp(self): |
||||
self.clock = MockClock() |
||||
self.as_api = Mock() |
||||
self.store = Mock() |
||||
self.service = Mock() |
||||
self.callback = Mock() |
||||
self.recoverer = _Recoverer( |
||||
clock=self.clock, |
||||
as_api=self.as_api, |
||||
store=self.store, |
||||
service=self.service, |
||||
callback=self.callback, |
||||
) |
||||
|
||||
def test_recover_single_txn(self): |
||||
txn = Mock() |
||||
# return one txn to send, then no more old txns |
||||
txns = [txn, None] |
||||
|
||||
def take_txn(*args, **kwargs): |
||||
return defer.succeed(txns.pop(0)) |
||||
self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn) |
||||
|
||||
self.recoverer.recover() |
||||
# shouldn't have called anything prior to waiting for exp backoff |
||||
self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) |
||||
txn.send = Mock(return_value=True) |
||||
# wait for exp backoff |
||||
self.clock.advance_time(2) |
||||
self.assertEquals(1, txn.send.call_count) |
||||
self.assertEquals(1, txn.complete.call_count) |
||||
# 2 because it needs to get None to know there are no more txns |
||||
self.assertEquals(2, self.store.get_oldest_unsent_txn.call_count) |
||||
self.callback.assert_called_once_with(self.recoverer) |
||||
self.assertEquals(self.recoverer.service, self.service) |
||||
|
||||
def test_recover_retry_txn(self): |
||||
txn = Mock() |
||||
txns = [txn, None] |
||||
pop_txn = False |
||||
|
||||
def take_txn(*args, **kwargs): |
||||
if pop_txn: |
||||
return defer.succeed(txns.pop(0)) |
||||
else: |
||||
return defer.succeed(txn) |
||||
self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn) |
||||
|
||||
self.recoverer.recover() |
||||
self.assertEquals(0, self.store.get_oldest_unsent_txn.call_count) |
||||
txn.send = Mock(return_value=False) |
||||
self.clock.advance_time(2) |
||||
self.assertEquals(1, txn.send.call_count) |
||||
self.assertEquals(0, txn.complete.call_count) |
||||
self.assertEquals(0, self.callback.call_count) |
||||
self.clock.advance_time(4) |
||||
self.assertEquals(2, txn.send.call_count) |
||||
self.assertEquals(0, txn.complete.call_count) |
||||
self.assertEquals(0, self.callback.call_count) |
||||
self.clock.advance_time(8) |
||||
self.assertEquals(3, txn.send.call_count) |
||||
self.assertEquals(0, txn.complete.call_count) |
||||
self.assertEquals(0, self.callback.call_count) |
||||
txn.send = Mock(return_value=True) # successfully send the txn |
||||
pop_txn = True # returns the txn the first time, then no more. |
||||
self.clock.advance_time(16) |
||||
self.assertEquals(1, txn.send.call_count) # new mock reset call count |
||||
self.assertEquals(1, txn.complete.call_count) |
||||
self.callback.assert_called_once_with(self.recoverer) |
||||
|
||||
|
||||
class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): |
||||
|
||||
def setUp(self): |
||||
self.txn_ctrl = Mock() |
||||
self.queuer = _ServiceQueuer(self.txn_ctrl) |
||||
|
||||
def test_send_single_event_no_queue(self): |
||||
# Expect the event to be sent immediately. |
||||
service = Mock(id=4) |
||||
event = Mock() |
||||
self.queuer.enqueue(service, event) |
||||
self.txn_ctrl.send.assert_called_once_with(service, [event]) |
||||
|
||||
def test_send_single_event_with_queue(self): |
||||
d = defer.Deferred() |
||||
self.txn_ctrl.send = Mock(return_value=d) |
||||
service = Mock(id=4) |
||||
event = Mock(event_id="first") |
||||
event2 = Mock(event_id="second") |
||||
event3 = Mock(event_id="third") |
||||
# Send an event and don't resolve it just yet. |
||||
self.queuer.enqueue(service, event) |
||||
# Send more events: expect send() to NOT be called multiple times. |
||||
self.queuer.enqueue(service, event2) |
||||
self.queuer.enqueue(service, event3) |
||||
self.txn_ctrl.send.assert_called_with(service, [event]) |
||||
self.assertEquals(1, self.txn_ctrl.send.call_count) |
||||
# Resolve the send event: expect the queued events to be sent |
||||
d.callback(service) |
||||
self.txn_ctrl.send.assert_called_with(service, [event2, event3]) |
||||
self.assertEquals(2, self.txn_ctrl.send.call_count) |
||||
|
||||
def test_multiple_service_queues(self): |
||||
# Tests that each service has its own queue, and that they don't block |
||||
# on each other. |
||||
srv1 = Mock(id=4) |
||||
srv_1_defer = defer.Deferred() |
||||
srv_1_event = Mock(event_id="srv1a") |
||||
srv_1_event2 = Mock(event_id="srv1b") |
||||
|
||||
srv2 = Mock(id=6) |
||||
srv_2_defer = defer.Deferred() |
||||
srv_2_event = Mock(event_id="srv2a") |
||||
srv_2_event2 = Mock(event_id="srv2b") |
||||
|
||||
send_return_list = [srv_1_defer, srv_2_defer] |
||||
self.txn_ctrl.send = Mock(side_effect=lambda x,y: send_return_list.pop(0)) |
||||
|
||||
# send events for different ASes and make sure they are sent |
||||
self.queuer.enqueue(srv1, srv_1_event) |
||||
self.queuer.enqueue(srv1, srv_1_event2) |
||||
self.txn_ctrl.send.assert_called_with(srv1, [srv_1_event]) |
||||
self.queuer.enqueue(srv2, srv_2_event) |
||||
self.queuer.enqueue(srv2, srv_2_event2) |
||||
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event]) |
||||
|
||||
# make sure callbacks for a service only send queued events for THAT |
||||
# service |
||||
srv_2_defer.callback(srv2) |
||||
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2]) |
||||
self.assertEquals(3, self.txn_ctrl.send.call_count) |
Loading…
Reference in new issue