@ -60,13 +60,11 @@ typedef struct LZ4State
bool compressing ;
/*
* Used by the Compressor API to mark if the compression headers have been
* written after initialization .
* I / O buffer area .
*/
bool needs_header_flush ;
size_t buflen ;
char * buffer ;
char * buffer ; /* buffer for compressed data */
size_t buflen ; /* allocated size of buffer */
size_t bufdata ; /* amount of valid data currently in buffer */
/*
* Used by the Stream API to store already uncompressed data that the
@ -76,12 +74,6 @@ typedef struct LZ4State
size_t overflowlen ;
char * overflowbuf ;
/*
* Used by both APIs to keep track of the compressed data length stored in
* the buffer .
*/
size_t compressedlen ;
/*
* Used by both APIs to keep track of error codes .
*/
@ -103,8 +95,17 @@ LZ4State_compression_init(LZ4State *state)
{
size_t status ;
/*
* Compute size needed for buffer , assuming we will present at most
* DEFAULT_IO_BUFFER_SIZE input bytes at a time .
*/
state - > buflen = LZ4F_compressBound ( DEFAULT_IO_BUFFER_SIZE , & state - > prefs ) ;
/*
* Then double it , to ensure we ' re not forced to flush every time .
*/
state - > buflen * = 2 ;
/*
* LZ4F_compressBegin requires a buffer that is greater or equal to
* LZ4F_HEADER_SIZE_MAX . Verify that the requirement is met .
@ -120,6 +121,10 @@ LZ4State_compression_init(LZ4State *state)
}
state - > buffer = pg_malloc ( state - > buflen ) ;
/*
* Insert LZ4 header into buffer .
*/
status = LZ4F_compressBegin ( state - > ctx ,
state - > buffer , state - > buflen ,
& state - > prefs ) ;
@ -129,7 +134,7 @@ LZ4State_compression_init(LZ4State *state)
return false ;
}
state - > compressedlen = status ;
state - > bufdata = status ;
return true ;
}
@ -201,36 +206,37 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
{
LZ4State * state = ( LZ4State * ) cs - > private_data ;
size_t remaining = dLen ;
size_t status ;
size_t chunk ;
/* Write the header if not yet written. */
if ( state - > needs_header_flush )
{
cs - > writeF ( AH , state - > buffer , state - > compressedlen ) ;
state - > needs_header_flush = false ;
}
while ( remaining > 0 )
{
size_t chunk ;
size_t required ;
size_t status ;
if ( remaining > DEFAULT_IO_BUFFER_SIZE )
chunk = DEFAULT_IO_BUFFER_SIZE ;
else
chunk = remaining ;
/* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
chunk = Min ( remaining , ( size_t ) DEFAULT_IO_BUFFER_SIZE ) ;
/* If not enough space, must flush buffer */
required = LZ4F_compressBound ( chunk , & state - > prefs ) ;
if ( required > state - > buflen - state - > bufdata )
{
cs - > writeF ( AH , state - > buffer , state - > bufdata ) ;
state - > bufdata = 0 ;
}
remaining - = chunk ;
status = LZ4F_compressUpdate ( state - > ctx ,
state - > buffer , state - > buflen ,
state - > buffer + state - > bufdata ,
state - > buflen - state - > bufdata ,
data , chunk , NULL ) ;
if ( LZ4F_isError ( status ) )
pg_fatal ( " could not compress data: %s " ,
LZ4F_getErrorName ( status ) ) ;
c s- > writeF ( AH , state - > buffer , status ) ;
state - > bufdata + = status ;
data = ( ( char * ) data ) + chunk ;
data = ( ( const char * ) data ) + chunk ;
remaining - = chunk ;
}
}
@ -238,29 +244,32 @@ static void
EndCompressorLZ4 ( ArchiveHandle * AH , CompressorState * cs )
{
LZ4State * state = ( LZ4State * ) cs - > private_data ;
size_t required ;
size_t status ;
/* Nothing needs to be done */
if ( ! state )
return ;
/*
* Write the header if not yet written . The caller is not required to call
* writeData if the relation does not contain any data . Thus it is
* possible to reach here without having flushed the header . Do it before
* ending the compression .
*/
if ( state - > needs_header_flush )
cs - > writeF ( AH , state - > buffer , state - > compressedlen ) ;
/* We might need to flush the buffer to make room for LZ4F_compressEnd */
required = LZ4F_compressBound ( 0 , & state - > prefs ) ;
if ( required > state - > buflen - state - > bufdata )
{
cs - > writeF ( AH , state - > buffer , state - > bufdata ) ;
state - > bufdata = 0 ;
}
status = LZ4F_compressEnd ( state - > ctx ,
state - > buffer , state - > buflen ,
state - > buffer + state - > bufdata ,
state - > buflen - state - > bufdata ,
NULL ) ;
if ( LZ4F_isError ( status ) )
pg_fatal ( " could not end compression: %s " ,
LZ4F_getErrorName ( status ) ) ;
state - > bufdata + = status ;
cs - > writeF ( AH , state - > buffer , status ) ;
/* Write the final bufferload */
cs - > writeF ( AH , state - > buffer , state - > bufdata ) ;
status = LZ4F_freeCompressionContext ( state - > ctx ) ;
if ( LZ4F_isError ( status ) )
@ -302,8 +311,6 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi
pg_fatal ( " could not initialize LZ4 compression: %s " ,
LZ4F_getErrorName ( state - > errcode ) ) ;
/* Remember that the header has not been written. */
state - > needs_header_flush = true ;
cs - > private_data = state ;
}
@ -360,19 +367,10 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
state - > compressing = compressing ;
/* When compressing, write LZ4 header to the output stream. */
if ( state - > compressing )
{
if ( ! LZ4State_compression_init ( state ) )
return false ;
errno = 0 ;
if ( fwrite ( state - > buffer , 1 , state - > compressedlen , state - > fp ) ! = state - > compressedlen )
{
errno = ( errno ) ? errno : ENOSPC ;
return false ;
}
}
else
{
@ -573,8 +571,7 @@ static void
LZ4Stream_write ( const void * ptr , size_t size , CompressFileHandle * CFH )
{
LZ4State * state = ( LZ4State * ) CFH - > private_data ;
size_t status ;
int remaining = size ;
size_t remaining = size ;
/* Lazy init */
if ( ! LZ4Stream_init ( state , size , true ) )
@ -583,23 +580,36 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
while ( remaining > 0 )
{
int chunk = Min ( remaining , DEFAULT_IO_BUFFER_SIZE ) ;
size_t chunk ;
size_t required ;
size_t status ;
remaining - = chunk ;
status = LZ4F_compressUpdate ( state - > ctx , state - > buffer , state - > buflen ,
ptr , chunk , NULL ) ;
if ( LZ4F_isError ( status ) )
pg_fatal ( " error during writing: %s " , LZ4F_getErrorName ( status ) ) ;
/* We don't try to present more than DEFAULT_IO_BUFFER_SIZE bytes */
chunk = Min ( remaining , ( size_t ) DEFAULT_IO_BUFFER_SIZE ) ;
/* If not enough space, must flush buffer */
required = LZ4F_compressBound ( chunk , & state - > prefs ) ;
if ( required > state - > buflen - state - > bufdata )
{
errno = 0 ;
if ( fwrite ( state - > buffer , 1 , status , state - > fp ) ! = status )
if ( fwrite ( state - > buffer , 1 , state - > bufdata , state - > fp ) ! = state - > bufdata )
{
errno = ( errno ) ? errno : ENOSPC ;
pg_fatal ( " error during writing: %m " ) ;
}
state - > bufdata = 0 ;
}
status = LZ4F_compressUpdate ( state - > ctx ,
state - > buffer + state - > bufdata ,
state - > buflen - state - > bufdata ,
ptr , chunk , NULL ) ;
if ( LZ4F_isError ( status ) )
pg_fatal ( " error during writing: %s " , LZ4F_getErrorName ( status ) ) ;
state - > bufdata + = status ;
ptr = ( ( const char * ) ptr ) + chunk ;
remaining - = chunk ;
}
}
@ -675,6 +685,7 @@ LZ4Stream_close(CompressFileHandle *CFH)
{
FILE * fp ;
LZ4State * state = ( LZ4State * ) CFH - > private_data ;
size_t required ;
size_t status ;
int ret ;
@ -683,21 +694,37 @@ LZ4Stream_close(CompressFileHandle *CFH)
{
if ( state - > compressing )
{
status = LZ4F_compressEnd ( state - > ctx , state - > buffer , state - > buflen , NULL ) ;
/* We might need to flush the buffer to make room */
required = LZ4F_compressBound ( 0 , & state - > prefs ) ;
if ( required > state - > buflen - state - > bufdata )
{
errno = 0 ;
if ( fwrite ( state - > buffer , 1 , state - > bufdata , state - > fp ) ! = state - > bufdata )
{
errno = ( errno ) ? errno : ENOSPC ;
pg_log_error ( " could not write to output file: %m " ) ;
}
state - > bufdata = 0 ;
}
status = LZ4F_compressEnd ( state - > ctx ,
state - > buffer + state - > bufdata ,
state - > buflen - state - > bufdata ,
NULL ) ;
if ( LZ4F_isError ( status ) )
{
pg_log_error ( " could not end compression: %s " ,
LZ4F_getErrorName ( status ) ) ;
}
else
{
state - > bufdata + = status ;
errno = 0 ;
if ( fwrite ( state - > buffer , 1 , status , state - > fp ) ! = status )
if ( fwrite ( state - > buffer , 1 , state - > bufdata , state - > fp ) ! = state - > bufdata )
{
errno = ( errno ) ? errno : ENOSPC ;
pg_log_error ( " could not write to output file: %m " ) ;
}
}
status = LZ4F_freeCompressionContext ( state - > ctx ) ;
if ( LZ4F_isError ( status ) )