@ -42,6 +42,7 @@
# include <sys/stat.h>
# include <sys/stat.h>
# include <unistd.h>
# include <unistd.h>
# include "access/xact.h"
# include "libpq/be-fsstubs.h"
# include "libpq/be-fsstubs.h"
# include "libpq/libpq-fs.h"
# include "libpq/libpq-fs.h"
# include "miscadmin.h"
# include "miscadmin.h"
@ -50,6 +51,7 @@
# include "utils/acl.h"
# include "utils/acl.h"
# include "utils/builtins.h"
# include "utils/builtins.h"
# include "utils/memutils.h"
# include "utils/memutils.h"
# include "utils/snapmgr.h"
/* define this to enable debug logging */
/* define this to enable debug logging */
/* #define FSDB 1 */
/* #define FSDB 1 */
@ -67,19 +69,11 @@
static LargeObjectDesc * * cookies = NULL ;
static LargeObjectDesc * * cookies = NULL ;
static int cookies_size = 0 ;
static int cookies_size = 0 ;
static bool lo_cleanup_needed = false ;
static MemoryContext fscxt = NULL ;
static MemoryContext fscxt = NULL ;
# define CreateFSContext() \
static int newLOfd ( void ) ;
do { \
static void closeLOfd ( int fd ) ;
if ( fscxt = = NULL ) \
fscxt = AllocSetContextCreate ( TopMemoryContext , \
" Filesystem " , \
ALLOCSET_DEFAULT_SIZES ) ; \
} while ( 0 )
static int newLOfd ( LargeObjectDesc * lobjCookie ) ;
static void deleteLOfd ( int fd ) ;
static Oid lo_import_internal ( text * filename , Oid lobjOid ) ;
static Oid lo_import_internal ( text * filename , Oid lobjOid ) ;
@ -99,11 +93,26 @@ be_lo_open(PG_FUNCTION_ARGS)
elog ( DEBUG4 , " lo_open(%u,%d) " , lobjId , mode ) ;
elog ( DEBUG4 , " lo_open(%u,%d) " , lobjId , mode ) ;
# endif
# endif
CreateFSContext ( ) ;
/*
* Allocate a large object descriptor first . This will also create
* ' fscxt ' if this is the first LO opened in this transaction .
*/
fd = newLOfd ( ) ;
lobjDesc = inv_open ( lobjId , mode , fscxt ) ;
lobjDesc = inv_open ( lobjId , mode , fscxt ) ;
lobjDesc - > subid = GetCurrentSubTransactionId ( ) ;
/*
* We must register the snapshot in TopTransaction ' s resowner so that it
* stays alive until the LO is closed rather than until the current portal
* shuts down .
*/
if ( lobjDesc - > snapshot )
lobjDesc - > snapshot = RegisterSnapshotOnOwner ( lobjDesc - > snapshot ,
TopTransactionResourceOwner ) ;
fd = newLOfd ( lobjDesc ) ;
Assert ( cookies [ fd ] = = NULL ) ;
cookies [ fd ] = lobjDesc ;
PG_RETURN_INT32 ( fd ) ;
PG_RETURN_INT32 ( fd ) ;
}
}
@ -122,9 +131,7 @@ be_lo_close(PG_FUNCTION_ARGS)
elog ( DEBUG4 , " lo_close(%d) " , fd ) ;
elog ( DEBUG4 , " lo_close(%d) " , fd ) ;
# endif
# endif
inv_close ( cookies [ fd ] ) ;
closeLOfd ( fd ) ;
deleteLOfd ( fd ) ;
PG_RETURN_INT32 ( 0 ) ;
PG_RETURN_INT32 ( 0 ) ;
}
}
@ -238,12 +245,7 @@ be_lo_creat(PG_FUNCTION_ARGS)
{
{
Oid lobjId ;
Oid lobjId ;
/*
lo_cleanup_needed = true ;
* We don ' t actually need to store into fscxt , but create it anyway to
* ensure that AtEOXact_LargeObject knows there is state to clean up
*/
CreateFSContext ( ) ;
lobjId = inv_create ( InvalidOid ) ;
lobjId = inv_create ( InvalidOid ) ;
PG_RETURN_OID ( lobjId ) ;
PG_RETURN_OID ( lobjId ) ;
@ -254,12 +256,7 @@ be_lo_create(PG_FUNCTION_ARGS)
{
{
Oid lobjId = PG_GETARG_OID ( 0 ) ;
Oid lobjId = PG_GETARG_OID ( 0 ) ;
/*
lo_cleanup_needed = true ;
* We don ' t actually need to store into fscxt , but create it anyway to
* ensure that AtEOXact_LargeObject knows there is state to clean up
*/
CreateFSContext ( ) ;
lobjId = inv_create ( lobjId ) ;
lobjId = inv_create ( lobjId ) ;
PG_RETURN_OID ( lobjId ) ;
PG_RETURN_OID ( lobjId ) ;
@ -330,16 +327,13 @@ be_lo_unlink(PG_FUNCTION_ARGS)
for ( i = 0 ; i < cookies_size ; i + + )
for ( i = 0 ; i < cookies_size ; i + + )
{
{
if ( cookies [ i ] ! = NULL & & cookies [ i ] - > id = = lobjId )
if ( cookies [ i ] ! = NULL & & cookies [ i ] - > id = = lobjId )
{
closeLOfd ( i ) ;
inv_close ( cookies [ i ] ) ;
deleteLOfd ( i ) ;
}
}
}
}
}
/*
/*
* inv_drop does not create a need for end - of - transaction cleanup and
* inv_drop does not create a need for end - of - transaction cleanup and
* hence we don ' t need to have created fscxt .
* hence we don ' t need to set lo_cleanup_needed .
*/
*/
PG_RETURN_INT32 ( inv_drop ( lobjId ) ) ;
PG_RETURN_INT32 ( inv_drop ( lobjId ) ) ;
}
}
@ -419,8 +413,6 @@ lo_import_internal(text *filename, Oid lobjOid)
LargeObjectDesc * lobj ;
LargeObjectDesc * lobj ;
Oid oid ;
Oid oid ;
CreateFSContext ( ) ;
/*
/*
* open the file to be read in
* open the file to be read in
*/
*/
@ -435,12 +427,13 @@ lo_import_internal(text *filename, Oid lobjOid)
/*
/*
* create an inversion object
* create an inversion object
*/
*/
lo_cleanup_needed = true ;
oid = inv_create ( lobjOid ) ;
oid = inv_create ( lobjOid ) ;
/*
/*
* read in from the filesystem and write to the inversion object
* read in from the filesystem and write to the inversion object
*/
*/
lobj = inv_open ( oid , INV_WRITE , fsc xt) ;
lobj = inv_open ( oid , INV_WRITE , CurrentMemoryConte xt) ;
while ( ( nbytes = read ( fd , buf , BUFSIZE ) ) > 0 )
while ( ( nbytes = read ( fd , buf , BUFSIZE ) ) > 0 )
{
{
@ -482,12 +475,11 @@ be_lo_export(PG_FUNCTION_ARGS)
LargeObjectDesc * lobj ;
LargeObjectDesc * lobj ;
mode_t oumask ;
mode_t oumask ;
CreateFSContext ( ) ;
/*
/*
* open the inversion object ( no need to test for failure )
* open the inversion object ( no need to test for failure )
*/
*/
lobj = inv_open ( lobjId , INV_READ , fscxt ) ;
lo_cleanup_needed = true ;
lobj = inv_open ( lobjId , INV_READ , CurrentMemoryContext ) ;
/*
/*
* open the file to be written to
* open the file to be written to
@ -592,20 +584,22 @@ AtEOXact_LargeObject(bool isCommit)
{
{
int i ;
int i ;
if ( fscxt = = NULL )
if ( ! lo_cleanup_needed )
return ; /* no LO operations in this xact */
return ; /* no LO operations in this xact */
/*
/*
* Close LO fds and clear cookies array so that LO fds are no longer good .
* Close LO fds and clear cookies array so that LO fds are no longer good .
* On abort we skip the close step .
* The memory context and resource owner holding them are going away at
* the end - of - transaction anyway , but on commit , we need to close them to
* avoid warnings about leaked resources at commit . On abort we can skip
* this step .
*/
*/
for ( i = 0 ; i < cookies_size ; i + + )
if ( isCommit )
{
{
if ( cookies [ i ] ! = NULL )
for ( i = 0 ; i < cookies_size ; i + + )
{
{
if ( isCommit )
if ( cookies [ i ] ! = NULL )
inv_close ( cookies [ i ] ) ;
closeLOfd ( i ) ;
deleteLOfd ( i ) ;
}
}
}
}
@ -614,11 +608,14 @@ AtEOXact_LargeObject(bool isCommit)
cookies_size = 0 ;
cookies_size = 0 ;
/* Release the LO memory context to prevent permanent memory leaks. */
/* Release the LO memory context to prevent permanent memory leaks. */
MemoryContextDelete ( fscxt ) ;
if ( fscxt )
MemoryContextDelete ( fscxt ) ;
fscxt = NULL ;
fscxt = NULL ;
/* Give inv_api.c a chance to clean up, too */
/* Give inv_api.c a chance to clean up, too */
close_lo_relation ( isCommit ) ;
close_lo_relation ( isCommit ) ;
lo_cleanup_needed = false ;
}
}
/*
/*
@ -646,14 +643,7 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
if ( isCommit )
if ( isCommit )
lo - > subid = parentSubid ;
lo - > subid = parentSubid ;
else
else
{
closeLOfd ( i ) ;
/*
* Make sure we do not call inv_close twice if it errors out
* for some reason . Better a leak than a crash .
*/
deleteLOfd ( i ) ;
inv_close ( lo ) ;
}
}
}
}
}
}
}
@ -663,19 +653,22 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
static int
static int
newLOfd ( LargeObjectDesc * lobjCookie )
newLOfd ( void )
{
{
int i ,
int i ,
newsize ;
newsize ;
lo_cleanup_needed = true ;
if ( fscxt = = NULL )
fscxt = AllocSetContextCreate ( TopMemoryContext ,
" Filesystem " ,
ALLOCSET_DEFAULT_SIZES ) ;
/* Try to find a free slot */
/* Try to find a free slot */
for ( i = 0 ; i < cookies_size ; i + + )
for ( i = 0 ; i < cookies_size ; i + + )
{
{
if ( cookies [ i ] = = NULL )
if ( cookies [ i ] = = NULL )
{
cookies [ i ] = lobjCookie ;
return i ;
return i ;
}
}
}
/* No free slot, so make the array bigger */
/* No free slot, so make the array bigger */
@ -700,15 +693,25 @@ newLOfd(LargeObjectDesc *lobjCookie)
cookies_size = newsize ;
cookies_size = newsize ;
}
}
Assert ( cookies [ i ] = = NULL ) ;
cookies [ i ] = lobjCookie ;
return i ;
return i ;
}
}
static void
static void
delet eLOfd( int fd )
clos eLOfd( int fd )
{
{
LargeObjectDesc * lobj ;
/*
* Make sure we do not try to free twice if this errors out for some
* reason . Better a leak than a crash .
*/
lobj = cookies [ fd ] ;
cookies [ fd ] = NULL ;
cookies [ fd ] = NULL ;
if ( lobj - > snapshot )
UnregisterSnapshotFromOwner ( lobj - > snapshot ,
TopTransactionResourceOwner ) ;
inv_close ( lobj ) ;
}
}
/*****************************************************************************
/*****************************************************************************
@ -727,13 +730,8 @@ lo_get_fragment_internal(Oid loOid, int64 offset, int32 nbytes)
int total_read PG_USED_FOR_ASSERTS_ONLY ;
int total_read PG_USED_FOR_ASSERTS_ONLY ;
bytea * result = NULL ;
bytea * result = NULL ;
/*
lo_cleanup_needed = true ;
* We don ' t actually need to store into fscxt , but create it anyway to
loDesc = inv_open ( loOid , INV_READ , CurrentMemoryContext ) ;
* ensure that AtEOXact_LargeObject knows there is state to clean up
*/
CreateFSContext ( ) ;
loDesc = inv_open ( loOid , INV_READ , fscxt ) ;
/*
/*
* Compute number of bytes we ' ll actually read , accommodating nbytes = = - 1
* Compute number of bytes we ' ll actually read , accommodating nbytes = = - 1
@ -817,10 +815,9 @@ be_lo_from_bytea(PG_FUNCTION_ARGS)
LargeObjectDesc * loDesc ;
LargeObjectDesc * loDesc ;
int written PG_USED_FOR_ASSERTS_ONLY ;
int written PG_USED_FOR_ASSERTS_ONLY ;
CreateFSContext ( ) ;
lo_cleanup_needed = true ;
loOid = inv_create ( loOid ) ;
loOid = inv_create ( loOid ) ;
loDesc = inv_open ( loOid , INV_WRITE , fsc xt) ;
loDesc = inv_open ( loOid , INV_WRITE , CurrentMemoryConte xt) ;
written = inv_write ( loDesc , VARDATA_ANY ( str ) , VARSIZE_ANY_EXHDR ( str ) ) ;
written = inv_write ( loDesc , VARDATA_ANY ( str ) , VARSIZE_ANY_EXHDR ( str ) ) ;
Assert ( written = = VARSIZE_ANY_EXHDR ( str ) ) ;
Assert ( written = = VARSIZE_ANY_EXHDR ( str ) ) ;
inv_close ( loDesc ) ;
inv_close ( loDesc ) ;
@ -840,9 +837,8 @@ be_lo_put(PG_FUNCTION_ARGS)
LargeObjectDesc * loDesc ;
LargeObjectDesc * loDesc ;
int written PG_USED_FOR_ASSERTS_ONLY ;
int written PG_USED_FOR_ASSERTS_ONLY ;
CreateFSContext ( ) ;
lo_cleanup_needed = true ;
loDesc = inv_open ( loOid , INV_WRITE , CurrentMemoryContext ) ;
loDesc = inv_open ( loOid , INV_WRITE , fscxt ) ;
/* Permission check */
/* Permission check */
if ( ! lo_compat_privileges & &
if ( ! lo_compat_privileges & &