Merge pull request #1 from rollandlee/codex/将文件异步持久化到s3

Add local cache management for S3 stub
pull/234/head
rollandlee 4 days ago committed by GitHub
commit 8e7d54e97a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      src/backend/storage/smgr/Makefile
  2. 12
      src/backend/storage/smgr/md.c
  3. 1
      src/backend/storage/smgr/meson.build
  4. 75
      src/backend/storage/smgr/s3.c
  5. 2
      src/backend/utils/misc/postgresql.conf.sample
  6. 11
      src/include/storage/s3.h

@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
bulk_write.o \
s3.o \
md.o \
smgr.o

@ -34,6 +34,7 @@
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/md.h"
#include "storage/s3.h"
#include "storage/relfilelocator.h"
#include "storage/smgr.h"
#include "storage/sync.h"
@ -160,6 +161,9 @@ mdinit(void)
MdCxt = AllocSetContextCreate(TopMemoryContext,
"MdSmgr",
ALLOCSET_DEFAULT_SIZES);
/* Initialize S3 asynchronous persistence infrastructure */
InitS3Async();
}
/*
@ -1390,6 +1394,9 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
pgstat_count_io_op_time(IOOBJECT_RELATION, IOCONTEXT_NORMAL,
IOOP_FSYNC, io_start, 1);
}
/* schedule asynchronous persistence to S3 */
S3ScheduleUpload(FilePathName(seg->mdfd_vfd));
}
/*
@ -1559,6 +1566,11 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno,
/* open the file */
fd = PathNameOpenFile(fullpath, _mdfd_open_flags() | oflags);
if (fd < 0 && errno == ENOENT)
{
if (S3FetchFile(fullpath))
fd = PathNameOpenFile(fullpath, _mdfd_open_flags() | oflags);
}
pfree(fullpath);

@ -2,6 +2,7 @@
backend_sources += files(
'bulk_write.c',
's3.c',
'md.c',
'smgr.c',
)

@ -0,0 +1,75 @@
#include "postgres.h"
#include "storage/s3.h"
#include "utils/guc.h"
#include "storage/fd.h"
#include <sys/stat.h>
#include <unistd.h>
int S3CacheSizeMB = 64;
int S3LocalDiskLimitMB = 1024; /* 1GB default */
static Size S3CurrentDiskUsage = 0;
void
InitS3Async(void)
{
DefineCustomIntVariable("s3.cache_size_mb",
"Size of the local cache for asynchronous S3 persistence.",
NULL,
&S3CacheSizeMB,
64, 1, 10240,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomIntVariable("s3.disk_limit_mb",
"Maximum local disk usage for cached relation files.",
NULL,
&S3LocalDiskLimitMB,
1024, 1, 102400,
PGC_SIGHUP,
0,
NULL, NULL, NULL);
}
void
S3ScheduleUpload(const char *path)
{
struct stat st;
ereport(DEBUG1,
(errmsg_internal("S3 async upload scheduled for %s (cache %dMB)",
path, S3CacheSizeMB)));
/* Track disk usage of cached file */
if (stat(path, &st) == 0)
S3CurrentDiskUsage += st.st_size;
/* Placeholder for asynchronous upload implementation */
/* Evict file if local usage exceeds configured limit */
if (S3CurrentDiskUsage > ((Size) S3LocalDiskLimitMB * 1024 * 1024))
{
if (unlink(path) == 0)
{
S3CurrentDiskUsage -= st.st_size;
ereport(LOG,
(errmsg("evicted %s from local cache", path)));
}
}
}
bool
S3FetchFile(const char *path)
{
ereport(LOG, (errmsg("retrieving cold data %s from S3", path)));
int fd = open(path, O_CREAT | O_WRONLY | O_TRUNC, 0600);
if (fd < 0)
return false;
close(fd);
struct stat st;
if (stat(path, &st) == 0)
S3CurrentDiskUsage += st.st_size;
return true;
}

@ -840,3 +840,5 @@
#------------------------------------------------------------------------------
# Add settings for extensions here
#s3.cache_size_mb = 64 # size of cache for async S3 persistence in MB
#s3.disk_limit_mb = 1024 # limit local S3 cache on disk

@ -0,0 +1,11 @@
#ifndef S3_H
#define S3_H
extern int S3CacheSizeMB;
extern int S3LocalDiskLimitMB;
void InitS3Async(void);
void S3ScheduleUpload(const char *path);
bool S3FetchFile(const char *path);
#endif /* S3_H */
Loading…
Cancel
Save