mirror of https://github.com/postgres/postgres
LZ4 compression can now be performed on the client using pg_basebackup -Ft --compress client-lz4, and LZ4 decompression of a backup compressed on the server can be performed on the client using pg_basebackup -Fp --compress server-lz4. Dipesh Pandit, reviewed and tested by Jeevan Ladhe and Tushar Ahuja, with a few corrections - and some documentation - by me. Discussion: http://postgr.es/m/CAN1g5_FeDmiA9D8wdG2W6Lkq5CpubxOAqTmd2et9hsinTJtsMQ@mail.gmail.compull/77/head
parent
dab298471f
commit
751b8d23b7
@ -0,0 +1,431 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* bbstreamer_lz4.c |
||||
* |
||||
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group |
||||
* |
||||
* IDENTIFICATION |
||||
* src/bin/pg_basebackup/bbstreamer_lz4.c |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres_fe.h" |
||||
|
||||
#include <unistd.h> |
||||
|
||||
#ifdef HAVE_LIBLZ4 |
||||
#include <lz4frame.h> |
||||
#endif |
||||
|
||||
#include "bbstreamer.h" |
||||
#include "common/logging.h" |
||||
#include "common/file_perm.h" |
||||
#include "common/string.h" |
||||
|
||||
#ifdef HAVE_LIBLZ4 |
||||
typedef struct bbstreamer_lz4_frame |
||||
{ |
||||
bbstreamer base; |
||||
|
||||
LZ4F_compressionContext_t cctx; |
||||
LZ4F_decompressionContext_t dctx; |
||||
LZ4F_preferences_t prefs; |
||||
|
||||
size_t bytes_written; |
||||
bool header_written; |
||||
} bbstreamer_lz4_frame; |
||||
|
||||
static void bbstreamer_lz4_compressor_content(bbstreamer *streamer, |
||||
bbstreamer_member *member, |
||||
const char *data, int len, |
||||
bbstreamer_archive_context context); |
||||
static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer); |
||||
static void bbstreamer_lz4_compressor_free(bbstreamer *streamer); |
||||
|
||||
const bbstreamer_ops bbstreamer_lz4_compressor_ops = { |
||||
.content = bbstreamer_lz4_compressor_content, |
||||
.finalize = bbstreamer_lz4_compressor_finalize, |
||||
.free = bbstreamer_lz4_compressor_free |
||||
}; |
||||
|
||||
static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer, |
||||
bbstreamer_member *member, |
||||
const char *data, int len, |
||||
bbstreamer_archive_context context); |
||||
static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer); |
||||
static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer); |
||||
|
||||
const bbstreamer_ops bbstreamer_lz4_decompressor_ops = { |
||||
.content = bbstreamer_lz4_decompressor_content, |
||||
.finalize = bbstreamer_lz4_decompressor_finalize, |
||||
.free = bbstreamer_lz4_decompressor_free |
||||
}; |
||||
#endif |
||||
|
||||
/*
|
||||
* Create a new base backup streamer that performs lz4 compression of tar |
||||
* blocks. |
||||
*/ |
||||
bbstreamer * |
||||
bbstreamer_lz4_compressor_new(bbstreamer *next, int compresslevel) |
||||
{ |
||||
#ifdef HAVE_LIBLZ4 |
||||
bbstreamer_lz4_frame *streamer; |
||||
LZ4F_errorCode_t ctxError; |
||||
LZ4F_preferences_t *prefs; |
||||
size_t compressed_bound; |
||||
|
||||
Assert(next != NULL); |
||||
|
||||
streamer = palloc0(sizeof(bbstreamer_lz4_frame)); |
||||
*((const bbstreamer_ops **) &streamer->base.bbs_ops) = |
||||
&bbstreamer_lz4_compressor_ops; |
||||
|
||||
streamer->base.bbs_next = next; |
||||
initStringInfo(&streamer->base.bbs_buffer); |
||||
streamer->header_written = false; |
||||
|
||||
/* Initialize stream compression preferences */ |
||||
prefs = &streamer->prefs; |
||||
memset(prefs, 0, sizeof(LZ4F_preferences_t)); |
||||
prefs->frameInfo.blockSizeID = LZ4F_max256KB; |
||||
prefs->compressionLevel = compresslevel; |
||||
|
||||
/*
|
||||
* Find out the compression bound, it specifies the minimum destination |
||||
* capacity required in worst case for the success of compression operation |
||||
* (LZ4F_compressUpdate) based on a given source size and preferences. |
||||
*/ |
||||
compressed_bound = LZ4F_compressBound(streamer->base.bbs_buffer.maxlen, prefs); |
||||
|
||||
/* Enlarge buffer if it falls short of compression bound. */ |
||||
if (streamer->base.bbs_buffer.maxlen <= compressed_bound) |
||||
enlargeStringInfo(&streamer->base.bbs_buffer, compressed_bound); |
||||
|
||||
ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION); |
||||
if (LZ4F_isError(ctxError)) |
||||
pg_log_error("could not create lz4 compression context: %s", |
||||
LZ4F_getErrorName(ctxError)); |
||||
|
||||
return &streamer->base; |
||||
#else |
||||
pg_log_error("this build does not support compression"); |
||||
exit(1); |
||||
#endif |
||||
} |
||||
|
||||
#ifdef HAVE_LIBLZ4 |
||||
/*
|
||||
* Compress the input data to output buffer. |
||||
* |
||||
* Find out the compression bound based on input data length for each |
||||
* invocation to make sure that output buffer has enough capacity to |
||||
* accommodate the compressed data. In case if the output buffer |
||||
* capacity falls short of compression bound then forward the content |
||||
* of output buffer to next streamer and empty the buffer. |
||||
*/ |
||||
static void |
||||
bbstreamer_lz4_compressor_content(bbstreamer *streamer, |
||||
bbstreamer_member *member, |
||||
const char *data, int len, |
||||
bbstreamer_archive_context context) |
||||
{ |
||||
bbstreamer_lz4_frame *mystreamer; |
||||
uint8 *next_in, |
||||
*next_out; |
||||
size_t out_bound, |
||||
compressed_size, |
||||
avail_out; |
||||
|
||||
mystreamer = (bbstreamer_lz4_frame *) streamer; |
||||
next_in = (uint8 *) data; |
||||
|
||||
/* Write header before processing the first input chunk. */ |
||||
if (!mystreamer->header_written) |
||||
{ |
||||
compressed_size = LZ4F_compressBegin(mystreamer->cctx, |
||||
(uint8 *) mystreamer->base.bbs_buffer.data, |
||||
mystreamer->base.bbs_buffer.maxlen, |
||||
&mystreamer->prefs); |
||||
|
||||
if (LZ4F_isError(compressed_size)) |
||||
pg_log_error("could not write lz4 header: %s", |
||||
LZ4F_getErrorName(compressed_size)); |
||||
|
||||
mystreamer->bytes_written += compressed_size; |
||||
mystreamer->header_written = true; |
||||
} |
||||
|
||||
/*
|
||||
* Update the offset and capacity of output buffer based on number of bytes |
||||
* written to output buffer. |
||||
*/ |
||||
next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; |
||||
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; |
||||
|
||||
/*
|
||||
* Find out the compression bound and make sure that output buffer has the |
||||
* required capacity for the success of LZ4F_compressUpdate. If needed |
||||
* forward the content to next streamer and empty the buffer. |
||||
*/ |
||||
out_bound = LZ4F_compressBound(len, &mystreamer->prefs); |
||||
Assert(mystreamer->base.bbs_buffer.maxlen >= out_bound); |
||||
if (avail_out <= out_bound) |
||||
{ |
||||
bbstreamer_content(mystreamer->base.bbs_next, member, |
||||
mystreamer->base.bbs_buffer.data, |
||||
mystreamer->bytes_written, |
||||
context); |
||||
|
||||
avail_out = mystreamer->base.bbs_buffer.maxlen; |
||||
mystreamer->bytes_written = 0; |
||||
next_out = (uint8 *) mystreamer->base.bbs_buffer.data; |
||||
} |
||||
|
||||
/*
|
||||
* This call compresses the data starting at next_in and generates the |
||||
* output starting at next_out. It expects the caller to provide the size |
||||
* of input buffer and capacity of output buffer by providing parameters |
||||
* len and avail_out. |
||||
* |
||||
* It returns the number of bytes compressed to output buffer. |
||||
*/ |
||||
compressed_size = LZ4F_compressUpdate(mystreamer->cctx, |
||||
next_out, avail_out, |
||||
next_in, len, NULL); |
||||
|
||||
if (LZ4F_isError(compressed_size)) |
||||
pg_log_error("could not compress data: %s", |
||||
LZ4F_getErrorName(compressed_size)); |
||||
|
||||
mystreamer->bytes_written += compressed_size; |
||||
} |
||||
|
||||
/*
|
||||
* End-of-stream processing. |
||||
*/ |
||||
static void |
||||
bbstreamer_lz4_compressor_finalize(bbstreamer *streamer) |
||||
{ |
||||
bbstreamer_lz4_frame *mystreamer; |
||||
uint8 *next_out; |
||||
size_t footer_bound, |
||||
compressed_size, |
||||
avail_out; |
||||
|
||||
mystreamer = (bbstreamer_lz4_frame *) streamer; |
||||
|
||||
/* Find out the footer bound and update the output buffer. */ |
||||
footer_bound = LZ4F_compressBound(0, &mystreamer->prefs); |
||||
Assert(mystreamer->base.bbs_buffer.maxlen >= footer_bound); |
||||
if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <= |
||||
footer_bound) |
||||
{ |
||||
bbstreamer_content(mystreamer->base.bbs_next, NULL, |
||||
mystreamer->base.bbs_buffer.data, |
||||
mystreamer->bytes_written, |
||||
BBSTREAMER_UNKNOWN); |
||||
|
||||
avail_out = mystreamer->base.bbs_buffer.maxlen; |
||||
mystreamer->bytes_written = 0; |
||||
next_out = (uint8 *) mystreamer->base.bbs_buffer.data; |
||||
} |
||||
else |
||||
{ |
||||
next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; |
||||
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; |
||||
} |
||||
|
||||
/*
|
||||
* Finalize the frame and flush whatever data remaining in compression |
||||
* context. |
||||
*/ |
||||
compressed_size = LZ4F_compressEnd(mystreamer->cctx, |
||||
next_out, avail_out, NULL); |
||||
|
||||
if (LZ4F_isError(compressed_size)) |
||||
pg_log_error("could not end lz4 compression: %s", |
||||
LZ4F_getErrorName(compressed_size)); |
||||
|
||||
mystreamer->bytes_written += compressed_size; |
||||
|
||||
bbstreamer_content(mystreamer->base.bbs_next, NULL, |
||||
mystreamer->base.bbs_buffer.data, |
||||
mystreamer->bytes_written, |
||||
BBSTREAMER_UNKNOWN); |
||||
|
||||
bbstreamer_finalize(mystreamer->base.bbs_next); |
||||
} |
||||
|
||||
/*
|
||||
* Free memory. |
||||
*/ |
||||
static void |
||||
bbstreamer_lz4_compressor_free(bbstreamer *streamer) |
||||
{ |
||||
bbstreamer_lz4_frame *mystreamer; |
||||
|
||||
mystreamer = (bbstreamer_lz4_frame *) streamer; |
||||
bbstreamer_free(streamer->bbs_next); |
||||
LZ4F_freeCompressionContext(mystreamer->cctx); |
||||
pfree(streamer->bbs_buffer.data); |
||||
pfree(streamer); |
||||
} |
||||
#endif |
||||
|
||||
/*
|
||||
* Create a new base backup streamer that performs decompression of lz4 |
||||
* compressed blocks. |
||||
*/ |
||||
bbstreamer * |
||||
bbstreamer_lz4_decompressor_new(bbstreamer *next) |
||||
{ |
||||
#ifdef HAVE_LIBLZ4 |
||||
bbstreamer_lz4_frame *streamer; |
||||
LZ4F_errorCode_t ctxError; |
||||
|
||||
Assert(next != NULL); |
||||
|
||||
streamer = palloc0(sizeof(bbstreamer_lz4_frame)); |
||||
*((const bbstreamer_ops **) &streamer->base.bbs_ops) = |
||||
&bbstreamer_lz4_decompressor_ops; |
||||
|
||||
streamer->base.bbs_next = next; |
||||
initStringInfo(&streamer->base.bbs_buffer); |
||||
|
||||
/* Initialize internal stream state for decompression */ |
||||
ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION); |
||||
if (LZ4F_isError(ctxError)) |
||||
{ |
||||
pg_log_error("could not initialize compression library: %s", |
||||
LZ4F_getErrorName(ctxError)); |
||||
exit(1); |
||||
} |
||||
|
||||
return &streamer->base; |
||||
#else |
||||
pg_log_error("this build does not support compression"); |
||||
exit(1); |
||||
#endif |
||||
} |
||||
|
||||
#ifdef HAVE_LIBLZ4 |
||||
/*
|
||||
* Decompress the input data to output buffer until we run out of input |
||||
* data. Each time the output buffer is full, pass on the decompressed data |
||||
* to the next streamer. |
||||
*/ |
||||
static void |
||||
bbstreamer_lz4_decompressor_content(bbstreamer *streamer, |
||||
bbstreamer_member *member, |
||||
const char *data, int len, |
||||
bbstreamer_archive_context context) |
||||
{ |
||||
bbstreamer_lz4_frame *mystreamer; |
||||
uint8 *next_in, |
||||
*next_out; |
||||
size_t avail_in, |
||||
avail_out; |
||||
|
||||
mystreamer = (bbstreamer_lz4_frame *) streamer; |
||||
next_in = (uint8 *) data; |
||||
next_out = (uint8 *) mystreamer->base.bbs_buffer.data; |
||||
avail_in = len; |
||||
avail_out = mystreamer->base.bbs_buffer.maxlen; |
||||
|
||||
while (avail_in > 0) |
||||
{ |
||||
size_t ret, |
||||
read_size, |
||||
out_size; |
||||
|
||||
read_size = avail_in; |
||||
out_size = avail_out; |
||||
|
||||
/*
|
||||
* This call decompresses the data starting at next_in and generates |
||||
* the output data starting at next_out. It expects the caller to |
||||
* provide size of the input buffer and total capacity of the output |
||||
* buffer by providing the read_size and out_size parameters |
||||
* respectively. |
||||
* |
||||
* Per the documentation of LZ4, parameters read_size and out_size |
||||
* behaves as dual parameters. On return, the number of bytes consumed |
||||
* from the input buffer will be written back to read_size and the |
||||
* number of bytes decompressed to output buffer will be written back |
||||
* to out_size respectively. |
||||
*/ |
||||
ret = LZ4F_decompress(mystreamer->dctx, |
||||
next_out, &out_size, |
||||
next_in, &read_size, NULL); |
||||
|
||||
if (LZ4F_isError(ret)) |
||||
pg_log_error("could not decompress data: %s", |
||||
LZ4F_getErrorName(ret)); |
||||
|
||||
/* Update input buffer based on number of bytes consumed */ |
||||
avail_in -= read_size; |
||||
next_in += read_size; |
||||
|
||||
mystreamer->bytes_written += out_size; |
||||
|
||||
/*
|
||||
* If output buffer is full then forward the content to next streamer and |
||||
* update the output buffer. |
||||
*/ |
||||
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) |
||||
{ |
||||
bbstreamer_content(mystreamer->base.bbs_next, member, |
||||
mystreamer->base.bbs_buffer.data, |
||||
mystreamer->base.bbs_buffer.maxlen, |
||||
context); |
||||
|
||||
avail_out = mystreamer->base.bbs_buffer.maxlen; |
||||
mystreamer->bytes_written = 0; |
||||
next_out = (uint8 *) mystreamer->base.bbs_buffer.data; |
||||
} |
||||
else |
||||
{ |
||||
avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; |
||||
next_out += mystreamer->bytes_written; |
||||
} |
||||
} |
||||
} |
||||
|
||||
/*
|
||||
* End-of-stream processing. |
||||
*/ |
||||
static void |
||||
bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer) |
||||
{ |
||||
bbstreamer_lz4_frame *mystreamer; |
||||
|
||||
mystreamer = (bbstreamer_lz4_frame *) streamer; |
||||
|
||||
/*
|
||||
* End of the stream, if there is some pending data in output buffers then |
||||
* we must forward it to next streamer. |
||||
*/ |
||||
bbstreamer_content(mystreamer->base.bbs_next, NULL, |
||||
mystreamer->base.bbs_buffer.data, |
||||
mystreamer->base.bbs_buffer.maxlen, |
||||
BBSTREAMER_UNKNOWN); |
||||
|
||||
bbstreamer_finalize(mystreamer->base.bbs_next); |
||||
} |
||||
|
||||
/*
|
||||
* Free memory. |
||||
*/ |
||||
static void |
||||
bbstreamer_lz4_decompressor_free(bbstreamer *streamer) |
||||
{ |
||||
bbstreamer_lz4_frame *mystreamer; |
||||
|
||||
mystreamer = (bbstreamer_lz4_frame *) streamer; |
||||
bbstreamer_free(streamer->bbs_next); |
||||
LZ4F_freeDecompressionContext(mystreamer->dctx); |
||||
pfree(streamer->bbs_buffer.data); |
||||
pfree(streamer); |
||||
} |
||||
#endif |
||||
@ -0,0 +1,111 @@ |
||||
# Copyright (c) 2021-2022, PostgreSQL Global Development Group |
||||
|
||||
# This test case aims to verify that client-side backup compression work |
||||
# properly, and it also aims to verify that pg_verifybackup can verify a base |
||||
# backup that didn't start out in plain format. |
||||
|
||||
use strict; |
||||
use warnings; |
||||
use Config; |
||||
use File::Path qw(rmtree); |
||||
use PostgreSQL::Test::Cluster; |
||||
use PostgreSQL::Test::Utils; |
||||
use Test::More tests => 9; |
||||
|
||||
my $primary = PostgreSQL::Test::Cluster->new('primary'); |
||||
$primary->init(allows_streaming => 1); |
||||
$primary->start; |
||||
|
||||
my $backup_path = $primary->backup_dir . '/client-backup'; |
||||
my $extract_path = $primary->backup_dir . '/extracted-backup'; |
||||
|
||||
my @test_configuration = ( |
||||
{ |
||||
'compression_method' => 'none', |
||||
'backup_flags' => [], |
||||
'backup_archive' => 'base.tar', |
||||
'enabled' => 1 |
||||
}, |
||||
{ |
||||
'compression_method' => 'gzip', |
||||
'backup_flags' => ['--compress', 'client-gzip:5'], |
||||
'backup_archive' => 'base.tar.gz', |
||||
'decompress_program' => $ENV{'GZIP_PROGRAM'}, |
||||
'decompress_flags' => [ '-d' ], |
||||
'enabled' => check_pg_config("#define HAVE_LIBZ 1") |
||||
}, |
||||
{ |
||||
'compression_method' => 'lz4', |
||||
'backup_flags' => ['--compress', 'client-lz4:5'], |
||||
'backup_archive' => 'base.tar.lz4', |
||||
'decompress_program' => $ENV{'LZ4'}, |
||||
'decompress_flags' => [ '-d' ], |
||||
'output_file' => 'base.tar', |
||||
'enabled' => check_pg_config("#define HAVE_LIBLZ4 1") |
||||
} |
||||
); |
||||
|
||||
for my $tc (@test_configuration) |
||||
{ |
||||
my $method = $tc->{'compression_method'}; |
||||
|
||||
SKIP: { |
||||
skip "$method compression not supported by this build", 3 |
||||
if ! $tc->{'enabled'}; |
||||
skip "no decompressor available for $method", 3 |
||||
if exists $tc->{'decompress_program'} && |
||||
!defined $tc->{'decompress_program'}; |
||||
|
||||
# Take a client-side backup. |
||||
my @backup = ( |
||||
'pg_basebackup', '-D', $backup_path, |
||||
'-Xfetch', '--no-sync', '-cfast', '-Ft'); |
||||
push @backup, @{$tc->{'backup_flags'}}; |
||||
$primary->command_ok(\@backup, |
||||
"client side backup, compression $method"); |
||||
|
||||
|
||||
# Verify that the we got the files we expected. |
||||
my $backup_files = join(',', |
||||
sort grep { $_ ne '.' && $_ ne '..' } slurp_dir($backup_path)); |
||||
my $expected_backup_files = join(',', |
||||
sort ('backup_manifest', $tc->{'backup_archive'})); |
||||
is($backup_files,$expected_backup_files, |
||||
"found expected backup files, compression $method"); |
||||
|
||||
# Decompress. |
||||
if (exists $tc->{'decompress_program'}) |
||||
{ |
||||
my @decompress = ($tc->{'decompress_program'}); |
||||
push @decompress, @{$tc->{'decompress_flags'}} |
||||
if $tc->{'decompress_flags'}; |
||||
push @decompress, $backup_path . '/' . $tc->{'backup_archive'}; |
||||
push @decompress, $backup_path . '/' . $tc->{'output_file'} |
||||
if $tc->{'output_file'}; |
||||
system_or_bail(@decompress); |
||||
} |
||||
|
||||
SKIP: { |
||||
my $tar = $ENV{TAR}; |
||||
# don't check for a working tar here, to accomodate various odd |
||||
# cases such as AIX. If tar doesn't work the init_from_backup below |
||||
# will fail. |
||||
skip "no tar program available", 1 |
||||
if (!defined $tar || $tar eq ''); |
||||
|
||||
# Untar. |
||||
mkdir($extract_path); |
||||
system_or_bail($tar, 'xf', $backup_path . '/base.tar', |
||||
'-C', $extract_path); |
||||
|
||||
# Verify. |
||||
$primary->command_ok([ 'pg_verifybackup', '-n', |
||||
'-m', "$backup_path/backup_manifest", '-e', $extract_path ], |
||||
"verify backup, compression $method"); |
||||
} |
||||
|
||||
# Cleanup. |
||||
rmtree($extract_path); |
||||
rmtree($backup_path); |
||||
} |
||||
} |
||||
Loading…
Reference in new issue