oauth: Add unit tests for multiplexer handling

To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.

This commit is a replay of 1443b6c0e, which was reverted due to
buildfarm failures. Compared with that, this version protects the build
targets in the Makefile with a with_libcurl conditional, and it tweaks
the code style in 001_oauth.pl.

Reviewed-by: Dagfinn Ilmari Mannsåker <ilmari@ilmari.org>
Reviewed-by: Andrew Dunstan <andrew@dunslane.net>
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
Discussion: https://postgr.es/m/CAOYmi+m=xY0P_uAzAP_884uF-GhQ3wrineGwc9AEnb6fYxVqVQ@mail.gmail.com
master
Jacob Champion 2 weeks ago
parent 52ecd05aee
commit 4e1e417330
  1. 36
      src/interfaces/libpq-oauth/Makefile
  2. 35
      src/interfaces/libpq-oauth/meson.build
  3. 24
      src/interfaces/libpq-oauth/t/001_oauth.pl
  4. 527
      src/interfaces/libpq-oauth/test-oauth-curl.c

@ -54,10 +54,6 @@ SHLIB_EXPORTS = exports.txt
# Disable -bundle_loader on macOS.
BE_DLLLIBS =
# By default, a library without an SONAME doesn't get a static library, so we
# add it to the build explicitly.
all: all-lib all-static-lib
# Shared library stuff
include $(top_srcdir)/src/Makefile.shlib
@ -66,6 +62,28 @@ include $(top_srcdir)/src/Makefile.shlib
%_shlib.o: %.c %.o
$(CC) $(CFLAGS) $(CFLAGS_SL) $(CPPFLAGS) $(CPPFLAGS_SHLIB) -c $< -o $@
.PHONY: all-tests
all-tests: oauth_tests$(X)
oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
#
# Top-Level Targets
#
# The existence of a t/ folder induces the buildfarm to run Make directly on
# this subdirectory, bypassing the recursion skip in src/interfaces/Makefile.
# Wrap the standard build targets in a with_libcurl conditional to avoid
# building OAuth code on platforms that haven't requested it. (The "clean"-style
# targets remain available.)
#
ifeq ($(with_libcurl), yes)
# By default, a library without an SONAME doesn't get a static library, so we
# add it to the build explicitly.
all: all-lib all-static-lib
# Ignore the standard rules for SONAME-less installation; we want both the
# static and shared libraries to go into libdir.
install: all installdirs $(stlib) $(shlib)
@ -75,9 +93,19 @@ install: all installdirs $(stlib) $(shlib)
installdirs:
$(MKDIR_P) '$(DESTDIR)$(libdir)'
check: all-tests
$(prove_check)
installcheck: all-tests
$(prove_installcheck)
endif # with_libcurl
uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
rm -f test-oauth-curl.o oauth_tests$(X)
rm -rf tmp_check

@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
libpq_oauth_test_deps = []
oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
if host_system == 'windows'
oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
'--NAME', 'oauth_tests',
'--FILEDESC', 'OAuth unit test program',])
endif
libpq_oauth_test_deps += executable('oauth_tests',
oauth_test_sources,
dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
kwargs: default_bin_args + {
'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
'c_pch': pch_postgres_fe_h,
'include_directories': [libpq_inc, postgres_inc],
'install': false,
}
)
testprep_targets += libpq_oauth_test_deps
tests += {
'name': 'libpq-oauth',
'sd': meson.current_source_dir(),
'bd': meson.current_build_dir(),
'tap': {
'tests': [
't/001_oauth.pl',
],
'deps': libpq_oauth_test_deps,
},
}

@ -0,0 +1,24 @@
# Copyright (c) 2025, PostgreSQL Global Development Group
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Utils;
use Test::More;
# Defer entirely to the oauth_tests executable. stdout/err is routed through
# Test::More so that our logging infrastructure can handle it correctly. Using
# IPC::Run::new_chunker seems to help interleave the two streams a little better
# than without.
#
# TODO: prove can also deal with native executables itself, which we could
# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
# calls Perl directly, which would require more code to work around... and
# there's still the matter of logging.
my $builder = Test::More->builder;
my $out = $builder->output;
my $err = $builder->failure_output;
IPC::Run::run ['oauth_tests'],
'>' => (IPC::Run::new_chunker, sub { $out->print($_[0]) }),
'2>' => (IPC::Run::new_chunker, sub { $err->print($_[0]) })
or die "oauth_tests returned $?";

@ -0,0 +1,527 @@
/*
* test-oauth-curl.c
*
* A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
* the tests reference static functions and other internals.
*
* USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
* must-succeed code as part of test setup.
*
* Copyright (c) 2025, PostgreSQL Global Development Group
*/
#include "oauth-curl.c"
#include <fcntl.h>
#ifdef USE_ASSERT_CHECKING
/*
* TAP Helpers
*/
static int num_tests = 0;
/*
* Reports ok/not ok to the TAP stream on stdout.
*/
#define ok(OK, TEST) \
ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
static bool
ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
{
printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
if (!ok)
{
printf("# at %s:%d:\n", file, line);
printf("# expression is false: %s\n", teststr);
}
return ok;
}
/*
* Like ok(this == that), but with more diagnostics on failure.
*
* Only works on ints, but luckily that's all we need here. Note that the much
* simpler-looking macro implementation
*
* is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
*
* suffers from multiple evaluation of the macro arguments...
*/
#define is(THIS, THAT, TEST) \
do { \
int this_ = (THIS), \
that_ = (THAT); \
is_diag( \
ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
this_, #THIS, that_, #THAT \
); \
} while (0)
static bool
is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
{
if (!ok)
printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
return ok;
}
/*
* Utilities
*/
/*
* Creates a partially-initialized async_ctx for the purposes of testing. Free
* with free_test_actx().
*/
static struct async_ctx *
init_test_actx(void)
{
struct async_ctx *actx;
actx = calloc(1, sizeof(*actx));
Assert(actx);
actx->mux = PGINVALID_SOCKET;
actx->timerfd = -1;
actx->debugging = true;
initPQExpBuffer(&actx->errbuf);
Assert(setup_multiplexer(actx));
return actx;
}
static void
free_test_actx(struct async_ctx *actx)
{
termPQExpBuffer(&actx->errbuf);
if (actx->mux != PGINVALID_SOCKET)
close(actx->mux);
if (actx->timerfd >= 0)
close(actx->timerfd);
free(actx);
}
static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
/*
* Writes to the write side of a pipe until it won't take any more data. Returns
* the amount written.
*/
static ssize_t
fill_pipe(int fd)
{
int mode;
ssize_t written = 0;
/* Don't block. */
Assert((mode = fcntl(fd, F_GETFL)) != -1);
Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
while (true)
{
ssize_t w;
w = write(fd, dummy_buf, sizeof(dummy_buf));
if (w < 0)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
perror("write to pipe");
written = -1;
}
break;
}
written += w;
}
/* Reset the descriptor flags. */
Assert(fcntl(fd, F_SETFD, mode) == 0);
return written;
}
/*
* Drains the requested amount of data from the read side of a pipe.
*/
static bool
drain_pipe(int fd, ssize_t n)
{
Assert(n > 0);
while (n)
{
size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
ssize_t drained;
drained = read(fd, dummy_buf, to_read);
if (drained < 0)
{
perror("read from pipe");
return false;
}
n -= drained;
}
return true;
}
/*
* Tests whether the multiplexer is marked ready by the deadline. This is a
* macro so that file/line information makes sense during failures.
*
* NB: our current multiplexer implementations (epoll/kqueue) are *readable*
* when the underlying libcurl sockets are *writable*. This behavior is pinned
* here to record that expectation; PGRES_POLLING_READING is hardcoded
* throughout the flow and would need to be changed if a new multiplexer does
* something different.
*/
#define mux_is_ready(MUX, DEADLINE, TEST) \
do { \
int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
Assert(res_ != -1); \
ok(res_ > 0, "multiplexer is ready " TEST); \
} while (0)
/*
* The opposite of mux_is_ready().
*/
#define mux_is_not_ready(MUX, TEST) \
do { \
int res_ = PQsocketPoll(MUX, 1, 0, 0); \
Assert(res_ != -1); \
is(res_, 0, "multiplexer is not ready " TEST); \
} while (0)
/*
* Test Suites
*/
/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
static void
test_set_timer(void)
{
struct async_ctx *actx = init_test_actx();
const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
printf("# test_set_timer\n");
/* A zero-duration timer should result in a near-immediate ready signal. */
Assert(set_timer(actx, 0));
mux_is_ready(actx->mux, deadline, "when timer expires");
is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
/* Resetting the timer far in the future should unset the ready signal. */
Assert(set_timer(actx, INT_MAX));
mux_is_not_ready(actx->mux, "when timer is reset to the future");
is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
/* Setting another zero-duration timer should override the previous one. */
Assert(set_timer(actx, 0));
mux_is_ready(actx->mux, deadline, "when timer is re-expired");
is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
/* And disabling that timer should once again unset the ready signal. */
Assert(set_timer(actx, -1));
mux_is_not_ready(actx->mux, "when timer is unset");
is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
{
bool expired;
/* Make sure drain_timer_events() functions correctly as well. */
Assert(set_timer(actx, 0));
mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
Assert(drain_timer_events(actx, &expired));
mux_is_not_ready(actx->mux, "when timer is drained after expiring");
is(expired, 1, "drain_timer_events() reports expiration");
is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
/* A second drain should do nothing. */
Assert(drain_timer_events(actx, &expired));
mux_is_not_ready(actx->mux, "when timer is drained a second time");
is(expired, 0, "drain_timer_events() reports no expiration");
is(timer_expired(actx), 0, "timer_expired() still returns 0");
}
free_test_actx(actx);
}
static void
test_register_socket(void)
{
struct async_ctx *actx = init_test_actx();
int pipefd[2];
int rfd,
wfd;
bool bidirectional;
/* Create a local pipe for communication. */
Assert(pipe(pipefd) == 0);
rfd = pipefd[0];
wfd = pipefd[1];
/*
* Some platforms (FreeBSD) implement bidirectional pipes, affecting the
* behavior of some of these tests. Store that knowledge for later.
*/
bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
/*
* This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
* read/write operations, respectively, and once using CURL_POLL_INOUT for
* both sides.
*/
for (int inout = 0; inout < 2; inout++)
{
const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
size_t bidi_pipe_size = 0; /* silence compiler warnings */
printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
/*
* At the start of the test, the read side should be blocked and the
* write side should be open. (There's a mistake at the end of this
* loop otherwise.)
*/
Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
/*
* For bidirectional systems, emulate unidirectional behavior here by
* filling up the "read side" of the pipe.
*/
if (bidirectional)
Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
/* Listen on the read side. The multiplexer shouldn't be ready yet. */
Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
mux_is_not_ready(actx->mux, "when fd is not readable");
/* Writing to the pipe should result in a read-ready multiplexer. */
Assert(write(wfd, "x", 1) == 1);
mux_is_ready(actx->mux, deadline, "when fd is readable");
/*
* Update the registration to wait on write events instead. The
* multiplexer should be unset.
*/
Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
/* Re-register for read events. */
Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
mux_is_ready(actx->mux, deadline, "when waiting for reads again");
/* Stop listening. The multiplexer should be unset. */
Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
mux_is_not_ready(actx->mux, "when readable fd is removed");
/* Listen again. */
Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
/*
* Draining the pipe should unset the multiplexer again, once the old
* event is cleared.
*/
Assert(drain_pipe(rfd, 1));
Assert(comb_multiplexer(actx));
mux_is_not_ready(actx->mux, "when fd is drained");
/* Undo any unidirectional emulation. */
if (bidirectional)
Assert(drain_pipe(wfd, bidi_pipe_size));
/* Listen on the write side. An empty buffer should be writable. */
Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
mux_is_ready(actx->mux, deadline, "when fd is writable");
/* As above, wait on read events instead. */
Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
/* Re-register for write events. */
Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
mux_is_ready(actx->mux, deadline, "when waiting for writes again");
{
ssize_t written;
/*
* Fill the pipe. Once the old writable event is cleared, the mux
* should not be ready.
*/
Assert((written = fill_pipe(wfd)) > 0);
printf("# pipe buffer is full at %zd bytes\n", written);
Assert(comb_multiplexer(actx));
mux_is_not_ready(actx->mux, "when fd buffer is full");
/* Drain the pipe again. */
Assert(drain_pipe(rfd, written));
mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
}
/* Stop listening. */
Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
mux_is_not_ready(actx->mux, "when fd is removed");
/* Make sure an expired timer doesn't interfere with event draining. */
{
bool expired;
/* Make the rfd appear unidirectional if necessary. */
if (bidirectional)
Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
/* Set the timer and wait for it to expire. */
Assert(set_timer(actx, 0));
Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
is(timer_expired(actx), 1, "timer is expired");
/* Register for read events and make the fd readable. */
Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
Assert(write(wfd, "x", 1) == 1);
mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
/*
* Draining the pipe should unset the multiplexer again, once the
* old event is drained and the timer is reset.
*
* Order matters, since comb_multiplexer() doesn't have to remove
* stale events when active events exist. Follow the call sequence
* used in the code: drain the timer expiration, drain the pipe,
* then clear the stale events.
*/
Assert(drain_timer_events(actx, &expired));
Assert(drain_pipe(rfd, 1));
Assert(comb_multiplexer(actx));
is(expired, 1, "drain_timer_events() reports expiration");
is(timer_expired(actx), 0, "timer is no longer expired");
mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
/* Stop listening. */
Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
/* Undo any unidirectional emulation. */
if (bidirectional)
Assert(drain_pipe(wfd, bidi_pipe_size));
}
/* Ensure comb_multiplexer() can handle multiple stale events. */
{
int rfd2,
wfd2;
/* Create a second local pipe. */
Assert(pipe(pipefd) == 0);
rfd2 = pipefd[0];
wfd2 = pipefd[1];
/* Make both rfds appear unidirectional if necessary. */
if (bidirectional)
{
Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
Assert(fill_pipe(rfd2) == bidi_pipe_size);
}
/* Register for read events on both fds, and make them readable. */
Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
Assert(write(wfd, "x", 1) == 1);
Assert(write(wfd2, "x", 1) == 1);
mux_is_ready(actx->mux, deadline, "when two fds are readable");
/*
* Drain both fds. comb_multiplexer() should then ensure that the
* mux is no longer readable.
*/
Assert(drain_pipe(rfd, 1));
Assert(drain_pipe(rfd2, 1));
Assert(comb_multiplexer(actx));
mux_is_not_ready(actx->mux, "when two fds are drained");
/* Stop listening. */
Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
/* Undo any unidirectional emulation. */
if (bidirectional)
{
Assert(drain_pipe(wfd, bidi_pipe_size));
Assert(drain_pipe(wfd2, bidi_pipe_size));
}
close(rfd2);
close(wfd2);
}
}
close(rfd);
close(wfd);
free_test_actx(actx);
}
int
main(int argc, char *argv[])
{
const char *timeout;
/* Grab the default timeout. */
timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
if (timeout)
{
int timeout_s = atoi(timeout);
if (timeout_s > 0)
timeout_us = timeout_s * 1000 * 1000;
}
/*
* Set up line buffering for our output, to let stderr interleave in the
* log files.
*/
setvbuf(stdout, NULL, PG_IOLBF, 0);
test_set_timer();
test_register_socket();
printf("1..%d\n", num_tests);
return 0;
}
#else /* !USE_ASSERT_CHECKING */
/*
* Skip the test suite when we don't have assertions.
*/
int
main(int argc, char *argv[])
{
printf("1..0 # skip: cassert is not enabled\n");
return 0;
}
#endif /* USE_ASSERT_CHECKING */
Loading…
Cancel
Save