|
|
|
@ -256,23 +256,21 @@ class ReplicationLayer(object): |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def get_state_for_context(self, destination, context, event_id): |
|
|
|
|
"""Requests all of the `current` state PDUs for a given context from |
|
|
|
|
def get_state_for_room(self, destination, room_id, event_id): |
|
|
|
|
"""Requests all of the `current` state PDUs for a given room from |
|
|
|
|
a remote home server. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
destination (str): The remote homeserver to query for the state. |
|
|
|
|
context (str): The context we're interested in. |
|
|
|
|
room_id (str): The id of the room we're interested in. |
|
|
|
|
event_id (str): The id of the event we want the state at. |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
Deferred: Results in a list of PDUs. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
result = yield self.transport_layer.get_context_state( |
|
|
|
|
destination, |
|
|
|
|
context, |
|
|
|
|
event_id=event_id, |
|
|
|
|
result = yield self.transport_layer.get_room_state( |
|
|
|
|
destination, room_id, event_id=event_id, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
pdus = [ |
|
|
|
@ -288,9 +286,9 @@ class ReplicationLayer(object): |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def get_event_auth(self, destination, context, event_id): |
|
|
|
|
def get_event_auth(self, destination, room_id, event_id): |
|
|
|
|
res = yield self.transport_layer.get_event_auth( |
|
|
|
|
destination, context, event_id, |
|
|
|
|
destination, room_id, event_id, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
auth_chain = [ |
|
|
|
@ -304,9 +302,9 @@ class ReplicationLayer(object): |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def on_backfill_request(self, origin, context, versions, limit): |
|
|
|
|
def on_backfill_request(self, origin, room_id, versions, limit): |
|
|
|
|
pdus = yield self.handler.on_backfill_request( |
|
|
|
|
origin, context, versions, limit |
|
|
|
|
origin, room_id, versions, limit |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) |
|
|
|
@ -380,12 +378,10 @@ class ReplicationLayer(object): |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def on_context_state_request(self, origin, context, event_id): |
|
|
|
|
def on_context_state_request(self, origin, room_id, event_id): |
|
|
|
|
if event_id: |
|
|
|
|
pdus = yield self.handler.get_state_for_pdu( |
|
|
|
|
origin, |
|
|
|
|
context, |
|
|
|
|
event_id, |
|
|
|
|
origin, room_id, event_id, |
|
|
|
|
) |
|
|
|
|
auth_chain = yield self.store.get_auth_chain( |
|
|
|
|
[pdu.event_id for pdu in pdus] |
|
|
|
@ -413,7 +409,7 @@ class ReplicationLayer(object): |
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
|
def on_pull_request(self, origin, versions): |
|
|
|
|
raise NotImplementedError("Pull transacions not implemented") |
|
|
|
|
raise NotImplementedError("Pull transactions not implemented") |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def on_query_request(self, query_type, args): |
|
|
|
@ -422,30 +418,21 @@ class ReplicationLayer(object): |
|
|
|
|
defer.returnValue((200, response)) |
|
|
|
|
else: |
|
|
|
|
defer.returnValue( |
|
|
|
|
(404, "No handler for Query type '%s'" % (query_type, )) |
|
|
|
|
(404, "No handler for Query type '%s'" % (query_type,)) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def on_make_join_request(self, context, user_id): |
|
|
|
|
pdu = yield self.handler.on_make_join_request(context, user_id) |
|
|
|
|
def on_make_join_request(self, room_id, user_id): |
|
|
|
|
pdu = yield self.handler.on_make_join_request(room_id, user_id) |
|
|
|
|
time_now = self._clock.time_msec() |
|
|
|
|
defer.returnValue({ |
|
|
|
|
"event": pdu.get_pdu_json(time_now), |
|
|
|
|
}) |
|
|
|
|
defer.returnValue({"event": pdu.get_pdu_json(time_now)}) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def on_invite_request(self, origin, content): |
|
|
|
|
pdu = self.event_from_pdu_json(content) |
|
|
|
|
ret_pdu = yield self.handler.on_invite_request(origin, pdu) |
|
|
|
|
time_now = self._clock.time_msec() |
|
|
|
|
defer.returnValue( |
|
|
|
|
( |
|
|
|
|
200, |
|
|
|
|
{ |
|
|
|
|
"event": ret_pdu.get_pdu_json(time_now), |
|
|
|
|
} |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def on_send_join_request(self, origin, content): |
|
|
|
@ -462,26 +449,17 @@ class ReplicationLayer(object): |
|
|
|
|
})) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def on_event_auth(self, origin, context, event_id): |
|
|
|
|
def on_event_auth(self, origin, room_id, event_id): |
|
|
|
|
time_now = self._clock.time_msec() |
|
|
|
|
auth_pdus = yield self.handler.on_event_auth(event_id) |
|
|
|
|
defer.returnValue( |
|
|
|
|
( |
|
|
|
|
200, |
|
|
|
|
{ |
|
|
|
|
"auth_chain": [ |
|
|
|
|
a.get_pdu_json(time_now) for a in auth_pdus |
|
|
|
|
], |
|
|
|
|
} |
|
|
|
|
) |
|
|
|
|
) |
|
|
|
|
defer.returnValue((200, { |
|
|
|
|
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], |
|
|
|
|
})) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def make_join(self, destination, context, user_id): |
|
|
|
|
def make_join(self, destination, room_id, user_id): |
|
|
|
|
ret = yield self.transport_layer.make_join( |
|
|
|
|
destination=destination, |
|
|
|
|
context=context, |
|
|
|
|
user_id=user_id, |
|
|
|
|
destination, room_id, user_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
pdu_dict = ret["event"] |
|
|
|
@ -494,10 +472,10 @@ class ReplicationLayer(object): |
|
|
|
|
def send_join(self, destination, pdu): |
|
|
|
|
time_now = self._clock.time_msec() |
|
|
|
|
_, content = yield self.transport_layer.send_join( |
|
|
|
|
destination, |
|
|
|
|
pdu.room_id, |
|
|
|
|
pdu.event_id, |
|
|
|
|
pdu.get_pdu_json(time_now), |
|
|
|
|
destination=destination, |
|
|
|
|
room_id=pdu.room_id, |
|
|
|
|
event_id=pdu.event_id, |
|
|
|
|
content=pdu.get_pdu_json(time_now), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
logger.debug("Got content: %s", content) |
|
|
|
@ -507,9 +485,6 @@ class ReplicationLayer(object): |
|
|
|
|
for p in content.get("state", []) |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
# FIXME: We probably want to do something with the auth_chain given |
|
|
|
|
# to us |
|
|
|
|
|
|
|
|
|
auth_chain = [ |
|
|
|
|
self.event_from_pdu_json(p, outlier=True) |
|
|
|
|
for p in content.get("auth_chain", []) |
|
|
|
@ -523,11 +498,11 @@ class ReplicationLayer(object): |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def send_invite(self, destination, context, event_id, pdu): |
|
|
|
|
def send_invite(self, destination, room_id, event_id, pdu): |
|
|
|
|
time_now = self._clock.time_msec() |
|
|
|
|
code, content = yield self.transport_layer.send_invite( |
|
|
|
|
destination=destination, |
|
|
|
|
context=context, |
|
|
|
|
room_id=room_id, |
|
|
|
|
event_id=event_id, |
|
|
|
|
content=pdu.get_pdu_json(time_now), |
|
|
|
|
) |
|
|
|
@ -657,7 +632,7 @@ class ReplicationLayer(object): |
|
|
|
|
"_handle_new_pdu getting state for %s", |
|
|
|
|
pdu.room_id |
|
|
|
|
) |
|
|
|
|
state, auth_chain = yield self.get_state_for_context( |
|
|
|
|
state, auth_chain = yield self.get_state_for_room( |
|
|
|
|
origin, pdu.room_id, pdu.event_id, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
@ -816,7 +791,7 @@ class _TransactionQueue(object): |
|
|
|
|
logger.info("TX [%s] is ready for retry", destination) |
|
|
|
|
|
|
|
|
|
logger.info("TX [%s] _attempt_new_transaction", destination) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if destination in self.pending_transactions: |
|
|
|
|
# XXX: pending_transactions can get stuck on by a never-ending |
|
|
|
|
# request at which point pending_pdus_by_dest just keeps growing. |
|
|
|
@ -830,14 +805,15 @@ class _TransactionQueue(object): |
|
|
|
|
pending_failures = self.pending_failures_by_dest.pop(destination, []) |
|
|
|
|
|
|
|
|
|
if pending_pdus: |
|
|
|
|
logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) |
|
|
|
|
logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", |
|
|
|
|
destination, len(pending_pdus)) |
|
|
|
|
|
|
|
|
|
if not pending_pdus and not pending_edus and not pending_failures: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
logger.debug( |
|
|
|
|
"TX [%s] Attempting new transaction " |
|
|
|
|
"(pdus: %d, edus: %d, failures: %d)", |
|
|
|
|
"TX [%s] Attempting new transaction" |
|
|
|
|
" (pdus: %d, edus: %d, failures: %d)", |
|
|
|
|
destination, |
|
|
|
|
len(pending_pdus), |
|
|
|
|
len(pending_edus), |
|
|
|
|