From d46b41d7c89deb23a6a1afec9d7fe3544b9a3327 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 20 Sep 2023 14:23:38 -0500 Subject: [PATCH v1 4/5] Add contrib/fsync_checker fsync_checker is an extension which overrides the global storage manager to check for volatile relations, those which have been written but not synced to disk. --- contrib/Makefile | 1 + contrib/fsync_checker/fsync_checker.control | 5 + contrib/fsync_checker/fsync_checker_smgr.c | 249 ++++++++++++++++++++ contrib/fsync_checker/meson.build | 22 ++ contrib/meson.build | 1 + 5 files changed, 278 insertions(+) create mode 100644 contrib/fsync_checker/fsync_checker.control create mode 100644 contrib/fsync_checker/fsync_checker_smgr.c create mode 100644 contrib/fsync_checker/meson.build diff --git a/contrib/Makefile b/contrib/Makefile index da4e2316a3..c55ced6ec0 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -20,6 +20,7 @@ SUBDIRS = \ dict_int \ dict_xsyn \ earthdistance \ + fsync_checker \ file_fdw \ fuzzystrmatch \ hstore \ diff --git a/contrib/fsync_checker/fsync_checker.control b/contrib/fsync_checker/fsync_checker.control new file mode 100644 index 0000000000..7d0e36434b --- /dev/null +++ b/contrib/fsync_checker/fsync_checker.control @@ -0,0 +1,5 @@ +# fsync_checker extension +comment = 'SMGR extension for checking volatile writes' +default_version = '1.0' +module_pathname = '$libdir/fsync_checker' +relocatable = true diff --git a/contrib/fsync_checker/fsync_checker_smgr.c b/contrib/fsync_checker/fsync_checker_smgr.c new file mode 100644 index 0000000000..feef2f7d3e --- /dev/null +++ b/contrib/fsync_checker/fsync_checker_smgr.c @@ -0,0 +1,249 @@ +#include "postgres.h" + +#include "access/xlog.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "storage/smgr.h" +#include "storage/md.h" +#include "utils/hsearch.h" + +PG_MODULE_MAGIC; + +typedef struct volatileRelnKey +{ + RelFileLocator locator; + ForkNumber forknum; +} volatileRelnKey; + +typedef struct volatileRelnEntry +{ + volatileRelnKey key; + XLogRecPtr lsn; +} volatileRelnEntry; + +void _PG_init(void); + +static void fsync_checker_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void *buffer, bool skipFsync); +static void fsync_checker_immedsync(SMgrRelation reln, ForkNumber forknum); +static void fsync_checker_writev(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, const void **buffers, + BlockNumber nblocks, bool skipFsync); +static void fsync_checker_writeback(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, BlockNumber nblocks); +static void fsync_checker_zeroextend(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, int nblocks, bool skipFsync); + +static void fsync_checker_checkpoint_create(const CheckPoint *checkPoint); +static void fsync_checker_shmem_request(void); +static void fsync_checker_shmem_startup(void); + +static void add_reln(SMgrRelation reln, ForkNumber forknum); +static void remove_reln(SMgrRelation reln, ForkNumber forknum); + +static SMgrId fsync_checker_smgr_id; +static const struct f_smgr fsync_checker_smgr = { + .name = "fsync_checker", + .smgr_init = mdinit, + .smgr_shutdown = NULL, + .smgr_open = mdopen, + .smgr_close = mdclose, + .smgr_create = mdcreate, + .smgr_exists = mdexists, + .smgr_unlink = mdunlink, + .smgr_extend = fsync_checker_extend, + .smgr_zeroextend = fsync_checker_zeroextend, + .smgr_prefetch = mdprefetch, + .smgr_readv = mdreadv, + .smgr_writev = fsync_checker_writev, + .smgr_writeback = fsync_checker_writeback, + .smgr_nblocks = mdnblocks, + .smgr_truncate = mdtruncate, + .smgr_immedsync = fsync_checker_immedsync, +}; + +static HTAB *volatile_relns; +static LWLock *volatile_relns_lock; +static shmem_request_hook_type prev_shmem_request_hook; +static shmem_startup_hook_type prev_shmem_startup_hook; +static checkpoint_create_hook_type prev_checkpoint_create_hook; + +void +_PG_init(void) +{ + prev_checkpoint_create_hook = checkpoint_create_hook; + checkpoint_create_hook = fsync_checker_checkpoint_create; + + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = fsync_checker_shmem_request; + + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = fsync_checker_shmem_startup; + + /* + * Relation size of 0 means we can just defer to md, but it would be nice + * to just expose this functionality, so if I needed my own relation, I + * could use MdSmgrRelation as the parent. + */ + fsync_checker_smgr_id = smgr_register(&fsync_checker_smgr, 0); + + storage_manager_id = fsync_checker_smgr_id; +} + +static void +fsync_checker_checkpoint_create(const CheckPoint *checkPoint) +{ + long num_entries; + HASH_SEQ_STATUS status; + volatileRelnEntry *entry; + + if (prev_checkpoint_create_hook) + prev_checkpoint_create_hook(checkPoint); + + LWLockAcquire(volatile_relns_lock, LW_EXCLUSIVE); + + hash_seq_init(&status, volatile_relns); + + num_entries = hash_get_num_entries(volatile_relns); + elog(INFO, "Analyzing %ld volatile relations", num_entries); + while ((entry = hash_seq_search(&status))) + { + if (entry->lsn < checkPoint->redo) + { + char *path; + + path = relpathperm(entry->key.locator, entry->key.forknum); + + elog(WARNING, "Relation not previously synced: %s", path); + + pfree(path); + } + } + + LWLockRelease(volatile_relns_lock); +} + +static void +fsync_checker_shmem_request(void) +{ + if (prev_shmem_request_hook) + prev_shmem_request_hook(); + + RequestAddinShmemSpace(hash_estimate_size(1024, sizeof(volatileRelnEntry))); + RequestNamedLWLockTranche("fsync_checker volatile relns lock", 1); +} + +static void +fsync_checker_shmem_startup(void) +{ + HASHCTL ctl; + + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + ctl.keysize = sizeof(volatileRelnKey); + ctl.entrysize = sizeof(volatileRelnEntry); + volatile_relns = NULL; + volatile_relns_lock = NULL; + + /* + * Create or attach to the shared memory state, including hash table + */ + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + volatile_relns = ShmemInitHash("fsync_checker volatile relns", + 1024, 1024, &ctl, HASH_BLOBS | HASH_ELEM); + volatile_relns_lock = &GetNamedLWLockTranche("fsync_checker volatile relns lock")->lock; + + LWLockRelease(AddinShmemInitLock); +} + +static void +add_reln(SMgrRelation reln, ForkNumber forknum) +{ + bool found; + XLogRecPtr lsn; + volatileRelnKey key; + volatileRelnEntry *entry; + + key.locator = reln->smgr_rlocator.locator; + key.forknum = forknum; + + lsn = GetXLogWriteRecPtr(); + + LWLockAcquire(volatile_relns_lock, LW_EXCLUSIVE); + + entry = hash_search(volatile_relns, &key, HASH_ENTER, &found); + if (!found) + entry->lsn = lsn; + + LWLockRelease(volatile_relns_lock); +} + +static void +remove_reln(SMgrRelation reln, ForkNumber forknum) +{ + volatileRelnKey key; + + key.locator = reln->smgr_rlocator.locator; + key.forknum = forknum; + + LWLockAcquire(volatile_relns_lock, LW_EXCLUSIVE); + + hash_search(volatile_relns, &key, HASH_REMOVE, NULL); + + LWLockRelease(volatile_relns_lock); +} + +static void +fsync_checker_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + const void *buffer, bool skipFsync) +{ + if (!SmgrIsTemp(reln) && !skipFsync) + add_reln(reln, forknum); + + mdextend(reln, forknum, blocknum, buffer, skipFsync); +} + +static void +fsync_checker_immedsync(SMgrRelation reln, ForkNumber forknum) +{ + if (!SmgrIsTemp(reln)) + remove_reln(reln, forknum); + + mdimmedsync(reln, forknum); +} + +static void +fsync_checker_writev(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, const void **buffers, + BlockNumber nblocks, bool skipFsync) +{ + if (!SmgrIsTemp(reln) && !skipFsync) + add_reln(reln, forknum); + + mdwritev(reln, forknum, blocknum, buffers, nblocks, skipFsync); +} + +static void +fsync_checker_writeback(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, BlockNumber nblocks) +{ + if (!SmgrIsTemp(reln)) + remove_reln(reln, forknum); + + mdwriteback(reln, forknum, blocknum, nblocks); +} + +static void +fsync_checker_zeroextend(SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, int nblocks, bool skipFsync) +{ + if (!SmgrIsTemp(reln) && !skipFsync) + add_reln(reln, forknum); + + mdzeroextend(reln, forknum, blocknum, nblocks, skipFsync); +} diff --git a/contrib/fsync_checker/meson.build b/contrib/fsync_checker/meson.build new file mode 100644 index 0000000000..ce6ed7fe90 --- /dev/null +++ b/contrib/fsync_checker/meson.build @@ -0,0 +1,22 @@ +# Copyright (c) 2023, PostgreSQL Global Development Group + +fsync_checker_sources = files( + 'fsync_checker_smgr.c', +) + +if host_system == 'windows' + fsync_checker_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'fsync_checker', + '--FILEDESC', 'fsync_checker - SMGR extension for checking volatile relations',]) +endif + +fsync_checker = shared_module('fsync_checker', + fsync_checker_sources, + kwargs: contrib_mod_args, +) +contrib_targets += fsync_checker + +install_data( + 'fsync_checker.control', + kwargs: contrib_data_args, +) diff --git a/contrib/meson.build b/contrib/meson.build index c12dc906ca..e5d872494a 100644 --- a/contrib/meson.build +++ b/contrib/meson.build @@ -29,6 +29,7 @@ subdir('dict_int') subdir('dict_xsyn') subdir('earthdistance') subdir('file_fdw') +subdir('fsync_checker') subdir('fuzzystrmatch') subdir('hstore') subdir('hstore_plperl') -- Tristan Partin Neon (https://neon.tech)