|
|
|
|
@ -6,9 +6,7 @@ |
|
|
|
|
-- |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
local t_sort = table.sort; |
|
|
|
|
local t_insert = table.insert; |
|
|
|
|
local t_remove = table.remove; |
|
|
|
|
local t_concat = table.concat; |
|
|
|
|
local setmetatable = setmetatable; |
|
|
|
|
local tostring = tostring; |
|
|
|
|
@ -20,6 +18,7 @@ local log = require "util.logger".init("server_epoll"); |
|
|
|
|
local socket = require "socket"; |
|
|
|
|
local luasec = require "ssl"; |
|
|
|
|
local gettime = require "util.time".now; |
|
|
|
|
local indexedbheap = require "util.indexedbheap"; |
|
|
|
|
local createtable = require "util.table".create; |
|
|
|
|
local inet = require "util.net"; |
|
|
|
|
local inet_pton = inet.pton; |
|
|
|
|
@ -66,22 +65,24 @@ local fds = createtable(10, 0); -- FD -> conn |
|
|
|
|
|
|
|
|
|
-- Timer and scheduling -- |
|
|
|
|
|
|
|
|
|
local timers = {}; |
|
|
|
|
local timers = indexedbheap.create(); |
|
|
|
|
|
|
|
|
|
local function noop() end |
|
|
|
|
local function closetimer(t) |
|
|
|
|
t[1] = 0; |
|
|
|
|
t[2] = noop; |
|
|
|
|
timers:remove(t.id); |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
-- Set to true when timers have changed |
|
|
|
|
local resort_timers = false; |
|
|
|
|
local function reschedule(t, time) |
|
|
|
|
t[1] = time; |
|
|
|
|
timers:reprioritize(t.id, time); |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
-- Add absolute timer |
|
|
|
|
local function at(time, f) |
|
|
|
|
local timer = { time, f, close = closetimer }; |
|
|
|
|
t_insert(timers, timer); |
|
|
|
|
resort_timers = true; |
|
|
|
|
local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil }; |
|
|
|
|
timer.id = timers:insert(timer, time); |
|
|
|
|
return timer; |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
@ -94,50 +95,32 @@ end |
|
|
|
|
-- Return time until next timeout |
|
|
|
|
local function runtimers(next_delay, min_wait) |
|
|
|
|
-- Any timers at all? |
|
|
|
|
if not timers[1] then |
|
|
|
|
return next_delay; |
|
|
|
|
local now = gettime(); |
|
|
|
|
local peek = timers:peek(); |
|
|
|
|
while peek do |
|
|
|
|
|
|
|
|
|
if peek > now then |
|
|
|
|
next_delay = peek - now; |
|
|
|
|
break; |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
if resort_timers then |
|
|
|
|
-- Sort earliest timers to the end |
|
|
|
|
t_sort(timers, function (a, b) return a[1] > b[1]; end); |
|
|
|
|
resort_timers = false; |
|
|
|
|
local _, timer, id = timers:pop(); |
|
|
|
|
local ok, ret = pcall(timer[2], now); |
|
|
|
|
if ok and type(ret) == "number" then |
|
|
|
|
local next_time = now+ret; |
|
|
|
|
timer[1] = next_time; |
|
|
|
|
timers:insert(timer, next_time); |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
-- Iterate from the end and remove completed timers |
|
|
|
|
for i = #timers, 1, -1 do |
|
|
|
|
local timer = timers[i]; |
|
|
|
|
local t, f = timer[1], timer[2]; |
|
|
|
|
-- Get time for every iteration to increase accuracy |
|
|
|
|
local now = gettime(); |
|
|
|
|
if t > now then |
|
|
|
|
-- This timer should not fire yet |
|
|
|
|
local diff = t - now; |
|
|
|
|
if diff < next_delay then |
|
|
|
|
next_delay = diff; |
|
|
|
|
peek = timers:peek(); |
|
|
|
|
end |
|
|
|
|
break; |
|
|
|
|
end |
|
|
|
|
local new_timeout = f(now); |
|
|
|
|
if new_timeout then |
|
|
|
|
-- Schedule for 'delay' from the time actually scheduled, |
|
|
|
|
-- not from now, in order to prevent timer drift. |
|
|
|
|
timer[1] = t + new_timeout; |
|
|
|
|
resort_timers = true; |
|
|
|
|
else |
|
|
|
|
t_remove(timers, i); |
|
|
|
|
end |
|
|
|
|
if peek == nil then |
|
|
|
|
return next_delay; |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
if resort_timers or next_delay < min_wait then |
|
|
|
|
-- Timers may be added from within a timer callback. |
|
|
|
|
-- Those would not be considered for next_delay, |
|
|
|
|
-- and we might sleep for too long, so instead |
|
|
|
|
-- we return a shorter timeout so we can |
|
|
|
|
-- properly sort all new timers. |
|
|
|
|
next_delay = min_wait; |
|
|
|
|
if next_delay < min_wait then |
|
|
|
|
return min_wait; |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
return next_delay; |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
@ -243,8 +226,7 @@ function interface:setreadtimeout(t) |
|
|
|
|
end |
|
|
|
|
t = t or cfg.read_timeout; |
|
|
|
|
if self._readtimeout then |
|
|
|
|
self._readtimeout[1] = gettime() + t; |
|
|
|
|
resort_timers = true; |
|
|
|
|
self._readtimeout:reschedule(gettime() + t); |
|
|
|
|
else |
|
|
|
|
self._readtimeout = addtimer(t, function () |
|
|
|
|
if self:on("readtimeout") then |
|
|
|
|
@ -268,8 +250,7 @@ function interface:setwritetimeout(t) |
|
|
|
|
end |
|
|
|
|
t = t or cfg.send_timeout; |
|
|
|
|
if self._writetimeout then |
|
|
|
|
self._writetimeout[1] = gettime() + t; |
|
|
|
|
resort_timers = true; |
|
|
|
|
self._writetimeout:reschedule(gettime() + t); |
|
|
|
|
else |
|
|
|
|
self._writetimeout = addtimer(t, function () |
|
|
|
|
self:on("disconnect", "write timeout"); |
|
|
|
|
|