|
|
|
@ -158,18 +158,40 @@ def runUntilCurrentTimer(func): |
|
|
|
|
|
|
|
|
|
@functools.wraps(func) |
|
|
|
|
def f(*args, **kwargs): |
|
|
|
|
pending_calls = len(reactor.getDelayedCalls()) |
|
|
|
|
now = reactor.seconds() |
|
|
|
|
num_pending = 0 |
|
|
|
|
|
|
|
|
|
# _newTimedCalls is one long list of *all* pending calls. Below loop |
|
|
|
|
# is based off of impl of reactor.runUntilCurrent |
|
|
|
|
for delayed_call in reactor._newTimedCalls: |
|
|
|
|
if delayed_call.time > now: |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
if delayed_call.delayed_time > 0: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
num_pending += 1 |
|
|
|
|
|
|
|
|
|
num_pending += len(reactor.threadCallQueue) |
|
|
|
|
|
|
|
|
|
start = time.time() * 1000 |
|
|
|
|
ret = func(*args, **kwargs) |
|
|
|
|
end = time.time() * 1000 |
|
|
|
|
tick_time.inc_by(end - start) |
|
|
|
|
pending_calls_metric.inc_by(pending_calls) |
|
|
|
|
pending_calls_metric.inc_by(num_pending) |
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
return f |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if hasattr(reactor, "runUntilCurrent"): |
|
|
|
|
try: |
|
|
|
|
# Ensure the reactor has all the attributes we expect |
|
|
|
|
reactor.runUntilCurrent |
|
|
|
|
reactor._newTimedCalls |
|
|
|
|
reactor.threadCallQueue |
|
|
|
|
|
|
|
|
|
# runUntilCurrent is called when we have pending calls. It is called once |
|
|
|
|
# per iteratation after fd polling. |
|
|
|
|
reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) |
|
|
|
|
except AttributeError: |
|
|
|
|
pass |
|
|
|
|