|
|
|
@ -81,7 +81,7 @@ class ReplicationLayer(object): |
|
|
|
|
|
|
|
|
|
def register_edu_handler(self, edu_type, handler): |
|
|
|
|
if edu_type in self.edu_handlers: |
|
|
|
|
raise KeyError("Already have an EDU handler for %s" % (edu_type)) |
|
|
|
|
raise KeyError("Already have an EDU handler for %s" % (edu_type,)) |
|
|
|
|
|
|
|
|
|
self.edu_handlers[edu_type] = handler |
|
|
|
|
|
|
|
|
@ -102,7 +102,9 @@ class ReplicationLayer(object): |
|
|
|
|
object to encode as JSON. |
|
|
|
|
""" |
|
|
|
|
if query_type in self.query_handlers: |
|
|
|
|
raise KeyError("Already have a Query handler for %s" % (query_type)) |
|
|
|
|
raise KeyError( |
|
|
|
|
"Already have a Query handler for %s" % (query_type,) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
self.query_handlers[query_type] = handler |
|
|
|
|
|
|
|
|
@ -128,7 +130,10 @@ class ReplicationLayer(object): |
|
|
|
|
# TODO, add errback, etc. |
|
|
|
|
self._transaction_queue.enqueue_pdu(pdu, order) |
|
|
|
|
|
|
|
|
|
logger.debug("[%s] transaction_layer.enqueue_pdu... done", pdu.event_id) |
|
|
|
|
logger.debug( |
|
|
|
|
"[%s] transaction_layer.enqueue_pdu... done", |
|
|
|
|
pdu.event_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@log_function |
|
|
|
|
def send_edu(self, destination, edu_type, content): |
|
|
|
@ -317,7 +322,11 @@ class ReplicationLayer(object): |
|
|
|
|
|
|
|
|
|
if hasattr(transaction, "edus"): |
|
|
|
|
for edu in [Edu(**x) for x in transaction.edus]: |
|
|
|
|
self.received_edu(transaction.origin, edu.edu_type, edu.content) |
|
|
|
|
self.received_edu( |
|
|
|
|
transaction.origin, |
|
|
|
|
edu.edu_type, |
|
|
|
|
edu.content |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
results = yield defer.DeferredList(dl) |
|
|
|
|
|
|
|
|
|