turnutils_peer: Linux fast path with drain loop, recvmmsg/sendmmsg, UDP-GSO

The libevent EV_READ handler used to do one recvfrom + one sendto per
ready event, so a packet flood through the relay generated O(N) libevent
re-entries and 2N syscalls per N relayed datagrams — saturating one core
on the loadgen-side peer well below modern relay throughput.

On Linux, replace the handler with:
  * a drain loop: keep recvmmsg'ing in MSG_DONTWAIT until the queue
    returns less than a full batch, bounded by MAX_DRAIN_ROUNDS so a
    flood can't starve the rest of the event loop;
  * recvmmsg into a static mmsghdr[32] (peer is single-threaded) and
    reuse the same mmsghdr array for sendmmsg back — each entry already
    has msg_name pointing at the source (the echo destination) and the
    iovec pointing at the received bytes, so no userspace copy;
  * UDP-GSO: when the recvmmsg batch is homogeneous (≥2 entries, same
    source, same size, ≤1472 B), echo it as one sendmsg with UDP_SEGMENT
    cmsg so the kernel allocates one super-skb that traverses the
    network stack once.

The non-Linux build keeps the original recvfrom/sendto handler.

DigitalOcean nyc1 c-4 30 s alternating A/B paired with the GSO
turnserver (-Y packet -m 1):
  old peer: turn TX mean 228 k pps, peer CPU mean 91.0 % (saturated)
  new peer: turn TX mean 255 k pps, peer CPU mean 28.8 %

Peer CPU drops 3.2× while turn-side throughput climbs ~12 % because the
old peer was no longer fully reflecting at the GSO turnserver's rate.
The peer is no longer the loadgen-side bottleneck, freeing CPU for
multi-flow tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
claude/peer-fastpath
Pavel Punsky 7 hours ago
parent b1d5c467f3
commit f6a4c3429f
  1. 162
      src/apps/peer/udpserver.c

@ -32,15 +32,175 @@
* SUCH DAMAGE.
*/
#if defined(__linux__) && !defined(_GNU_SOURCE)
#define _GNU_SOURCE
#endif
#include "udpserver.h"
#include "apputils.h"
#include "stun_buffer.h"
#include <errno.h>
#include <string.h>
#include <limits.h> // for USHRT_MAX
#if defined(__linux__)
#include <netinet/udp.h>
#include <sys/socket.h>
#ifndef UDP_SEGMENT
#define UDP_SEGMENT 103
#endif
#ifndef SOL_UDP
#define SOL_UDP 17
#endif
#endif
/////////////// io handlers ///////////////////
#if defined(__linux__)
/* Per-callback batch state. The peer is single-threaded (one libevent
* event_base, all sockets dispatched serially), so module-static buffers
* are race-free and avoid per-callback allocation. */
#define PEER_BATCH 32
/* Max bytes we are willing to echo per datagram. STUN packets are small;
* 4 KiB covers anything that fits in a Path-MTU UDP datagram with comfortable
* headroom and keeps the static state at PEER_BATCH * 4 KiB = 128 KiB. */
#define PEER_DGRAM_MAX 4096
/* Bound the drain loop so a packet flood can't starve the rest of the event
* loop. PEER_BATCH * MAX_DRAIN_ROUNDS sets the upper bound on packets handled
* per readiness event. */
#define MAX_DRAIN_ROUNDS 8
/* UDP-GSO: a single sendmsg can describe many segments of gso_size bytes.
* We only enable it when batch entries share source AND size a deliberately
* conservative predicate that matches the synthetic packet-flood pattern
* without misbehaving on heterogeneous traffic. */
#define PEER_GSO_MAX_SEGSZ 1472
static struct mmsghdr g_msgs[PEER_BATCH];
static struct iovec g_iovs[PEER_BATCH];
static ioa_addr g_srcs[PEER_BATCH];
static uint8_t g_bufs[PEER_BATCH][PEER_DGRAM_MAX];
static int try_gso_echo(evutil_socket_t fd, int n) {
if (n < 2) {
return 0;
}
const uint32_t s0 = (uint32_t)g_msgs[0].msg_len;
if (s0 == 0 || s0 > PEER_GSO_MAX_SEGSZ) {
return 0;
}
const socklen_t namelen0 = g_msgs[0].msg_hdr.msg_namelen;
for (int i = 1; i < n; ++i) {
if (g_msgs[i].msg_len != s0) {
return 0;
}
if (g_msgs[i].msg_hdr.msg_namelen != namelen0) {
return 0;
}
if (memcmp(&g_srcs[i], &g_srcs[0], namelen0) != 0) {
return 0;
}
}
struct iovec sendiov[PEER_BATCH];
for (int i = 0; i < n; ++i) {
sendiov[i].iov_base = g_bufs[i];
sendiov[i].iov_len = (size_t)s0;
}
union {
struct cmsghdr align;
char buf[CMSG_SPACE(sizeof(uint16_t))];
} cmsg_buf = {0};
struct msghdr mh = {0};
mh.msg_iov = sendiov;
mh.msg_iovlen = (size_t)n;
mh.msg_name = &g_srcs[0];
mh.msg_namelen = namelen0;
mh.msg_control = cmsg_buf.buf;
mh.msg_controllen = sizeof(cmsg_buf.buf);
struct cmsghdr *cm = CMSG_FIRSTHDR(&mh);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
uint16_t seg = (uint16_t)s0;
memcpy(CMSG_DATA(cm), &seg, sizeof(seg));
ssize_t rc;
do {
rc = sendmsg(fd, &mh, 0);
} while (rc < 0 && errno == EINTR);
if (rc < 0) {
return 0;
}
return n;
}
static void udp_server_input_handler(evutil_socket_t fd, short what, void *arg) {
if (!(what & EV_READ)) {
return;
}
(void)arg; /* bind addr unused — recvmmsg fills msg_name with the actual
source for each datagram, which is also the echo destination. */
for (int round = 0; round < MAX_DRAIN_ROUNDS; ++round) {
for (int i = 0; i < PEER_BATCH; ++i) {
g_iovs[i].iov_base = g_bufs[i];
g_iovs[i].iov_len = sizeof(g_bufs[i]);
memset(&g_msgs[i].msg_hdr, 0, sizeof(g_msgs[i].msg_hdr));
g_msgs[i].msg_hdr.msg_iov = &g_iovs[i];
g_msgs[i].msg_hdr.msg_iovlen = 1;
g_msgs[i].msg_hdr.msg_name = &g_srcs[i];
g_msgs[i].msg_hdr.msg_namelen = sizeof(g_srcs[i]);
g_msgs[i].msg_len = 0;
}
int n;
do {
n = recvmmsg(fd, g_msgs, PEER_BATCH, MSG_DONTWAIT, NULL);
} while (n < 0 && errno == EINTR);
if (n <= 0) {
return;
}
int sent = try_gso_echo(fd, n);
if (!sent) {
/* Reuse the same mmsghdr array for sendmmsg: each entry already has
* msg_name pointing at the source (which is the echo destination) and
* the iovec pointing at the received bytes. Shrink iov_len to the
* actual recv'd size and fire. No userspace copy. */
for (int i = 0; i < n; ++i) {
g_iovs[i].iov_len = (size_t)g_msgs[i].msg_len;
}
int s = 0;
while (s < n) {
int r;
do {
r = sendmmsg(fd, &g_msgs[s], (unsigned int)(n - s), 0);
} while (r < 0 && errno == EINTR);
if (r <= 0) {
break;
}
s += r;
}
}
if (n < PEER_BATCH) {
/* recvmmsg returned a partial batch — kernel queue is drained. */
return;
}
}
}
#else /* !__linux__ */
static void udp_server_input_handler(evutil_socket_t fd, short what, void *arg) {
if (!(what & EV_READ)) {
@ -67,6 +227,8 @@ static void udp_server_input_handler(evutil_socket_t fd, short what, void *arg)
}
}
#endif /* __linux__ */
///////////////////// operations //////////////////////////
static int udp_create_server_socket(server_type *const server, const char *const ifname,

Loading…
Cancel
Save