|
|
|
@ -57,7 +57,7 @@ class ReplicationLayer(object): |
|
|
|
|
self.transport_layer.register_request_handler(self) |
|
|
|
|
|
|
|
|
|
self.store = hs.get_datastore() |
|
|
|
|
self.pdu_actions = PduActions(self.store) |
|
|
|
|
# self.pdu_actions = PduActions(self.store) |
|
|
|
|
self.transaction_actions = TransactionActions(self.store) |
|
|
|
|
|
|
|
|
|
self._transaction_queue = _TransactionQueue( |
|
|
|
@ -278,16 +278,16 @@ class ReplicationLayer(object): |
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def on_context_pdus_request(self, context): |
|
|
|
|
pdus = yield self.pdu_actions.get_all_pdus_from_context( |
|
|
|
|
context |
|
|
|
|
raise NotImplementedError( |
|
|
|
|
"on_context_pdus_request is a security violation" |
|
|
|
|
) |
|
|
|
|
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def on_backfill_request(self, context, versions, limit): |
|
|
|
|
|
|
|
|
|
pdus = yield self.pdu_actions.backfill(context, versions, limit) |
|
|
|
|
pdus = yield self.handler.on_backfill_request( |
|
|
|
|
context, versions, limit |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) |
|
|
|
|
|
|
|
|
@ -383,20 +383,22 @@ class ReplicationLayer(object): |
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def on_pull_request(self, origin, versions): |
|
|
|
|
transaction_id = max([int(v) for v in versions]) |
|
|
|
|
|
|
|
|
|
response = yield self.pdu_actions.after_transaction( |
|
|
|
|
transaction_id, |
|
|
|
|
origin, |
|
|
|
|
self.server_name |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if not response: |
|
|
|
|
response = [] |
|
|
|
|
|
|
|
|
|
defer.returnValue( |
|
|
|
|
(200, self._transaction_from_pdus(response).get_dict()) |
|
|
|
|
) |
|
|
|
|
raise NotImplementedError("Pull transacions not implemented") |
|
|
|
|
|
|
|
|
|
# transaction_id = max([int(v) for v in versions]) |
|
|
|
|
# |
|
|
|
|
# response = yield self.pdu_actions.after_transaction( |
|
|
|
|
# transaction_id, |
|
|
|
|
# origin, |
|
|
|
|
# self.server_name |
|
|
|
|
# ) |
|
|
|
|
# |
|
|
|
|
# if not response: |
|
|
|
|
# response = [] |
|
|
|
|
# |
|
|
|
|
# defer.returnValue( |
|
|
|
|
# (200, self._transaction_from_pdus(response).get_dict()) |
|
|
|
|
# ) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def on_query_request(self, query_type, args): |
|
|
|
@ -498,8 +500,7 @@ class ReplicationLayer(object): |
|
|
|
|
state = None |
|
|
|
|
|
|
|
|
|
# Get missing pdus if necessary. |
|
|
|
|
is_new = yield self.pdu_actions.is_new(pdu) |
|
|
|
|
if is_new and not pdu.outlier: |
|
|
|
|
if not pdu.outlier: |
|
|
|
|
# We only backfill backwards to the min depth. |
|
|
|
|
min_depth = yield self.store.get_min_depth_for_context(pdu.context) |
|
|
|
|
|
|
|
|
@ -539,7 +540,7 @@ class ReplicationLayer(object): |
|
|
|
|
else: |
|
|
|
|
ret = None |
|
|
|
|
|
|
|
|
|
yield self.pdu_actions.mark_as_processed(pdu) |
|
|
|
|
# yield self.pdu_actions.mark_as_processed(pdu) |
|
|
|
|
|
|
|
|
|
defer.returnValue(ret) |
|
|
|
|
|
|
|
|
|