@ -109,75 +109,17 @@ xlogVacuumPage(Relation index, Buffer buffer)
PageSetLSN ( page , recptr ) ;
}
static bool
ginVacuumPostingTreeLeaves ( GinVacuumState * gvs , BlockNumber blkno , bool isRoot , Buffer * rootBuffer )
{
Buffer buffer ;
Page page ;
bool hasVoidPage = FALSE ;
MemoryContext oldCxt ;
buffer = ReadBufferExtended ( gvs - > index , MAIN_FORKNUM , blkno ,
RBM_NORMAL , gvs - > strategy ) ;
page = BufferGetPage ( buffer ) ;
/*
* We should be sure that we don ' t concurrent with inserts , insert process
* never release root page until end ( but it can unlock it and lock
* again ) . New scan can ' t start but previously started ones work
* concurrently .
*/
if ( isRoot )
LockBufferForCleanup ( buffer ) ;
else
LockBuffer ( buffer , GIN_EXCLUSIVE ) ;
Assert ( GinPageIsData ( page ) ) ;
if ( GinPageIsLeaf ( page ) )
{
oldCxt = MemoryContextSwitchTo ( gvs - > tmpCxt ) ;
ginVacuumPostingTreeLeaf ( gvs - > index , buffer , gvs ) ;
MemoryContextSwitchTo ( oldCxt ) ;
MemoryContextReset ( gvs - > tmpCxt ) ;
/* if root is a leaf page, we don't desire further processing */
if ( ! isRoot & & ! hasVoidPage & & GinDataLeafPageIsEmpty ( page ) )
hasVoidPage = TRUE ;
}
else
{
OffsetNumber i ;
bool isChildHasVoid = FALSE ;
for ( i = FirstOffsetNumber ; i < = GinPageGetOpaque ( page ) - > maxoff ; i + + )
{
PostingItem * pitem = GinDataPageGetPostingItem ( page , i ) ;
if ( ginVacuumPostingTreeLeaves ( gvs , PostingItemGetBlockNumber ( pitem ) , FALSE , NULL ) )
isChildHasVoid = TRUE ;
}
if ( isChildHasVoid )
hasVoidPage = TRUE ;
}
typedef struct DataPageDeleteStack
{
struct DataPageDeleteStack * child ;
struct DataPageDeleteStack * parent ;
/*
* if we have root and there are empty pages in tree , then we don ' t
* release lock to go further processing and guarantee that tree is unused
*/
if ( ! ( isRoot & & hasVoidPage ) )
{
UnlockReleaseBuffer ( buffer ) ;
}
else
{
Assert ( rootBuffer ) ;
* rootBuffer = buffer ;
}
BlockNumber blkno ; /* current block number */
BlockNumber leftBlkno ; /* rightest non-deleted page on left */
bool isRoot ;
} DataPageDeleteStack ;
return hasVoidPage ;
}
/*
* Delete a posting tree page .
@ -194,8 +136,13 @@ ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkn
BlockNumber rightlink ;
/*
* Lock the pages in the same order as an insertion would , to avoid
* deadlocks : left , then right , then parent .
* This function MUST be called only if someone of parent pages hold
* exclusive cleanup lock . This guarantees that no insertions currently
* happen in this subtree . Caller also acquire Exclusive lock on deletable
* page and is acquiring and releasing exclusive lock on left page before .
* Left page was locked and released . Then parent and this page are locked .
* We acquire left page lock here only to mark page dirty after changing
* right pointer .
*/
lBuffer = ReadBufferExtended ( gvs - > index , MAIN_FORKNUM , leftBlkno ,
RBM_NORMAL , gvs - > strategy ) ;
@ -205,10 +152,6 @@ ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkn
RBM_NORMAL , gvs - > strategy ) ;
LockBuffer ( lBuffer , GIN_EXCLUSIVE ) ;
LockBuffer ( dBuffer , GIN_EXCLUSIVE ) ;
if ( ! isParentRoot ) /* parent is already locked by
* LockBufferForCleanup ( ) */
LockBuffer ( pBuffer , GIN_EXCLUSIVE ) ;
START_CRIT_SECTION ( ) ;
@ -272,26 +215,15 @@ ginDeletePage(GinVacuumState *gvs, BlockNumber deleteBlkno, BlockNumber leftBlkn
PageSetLSN ( BufferGetPage ( lBuffer ) , recptr ) ;
}
if ( ! isParentRoot )
LockBuffer ( pBuffer , GIN_UNLOCK ) ;
ReleaseBuffer ( pBuffer ) ;
UnlockReleaseBuffer ( lBuffer ) ;
Unlock ReleaseBuffer( dBuffer ) ;
ReleaseBuffer ( dBuffer ) ;
END_CRIT_SECTION ( ) ;
gvs - > result - > pages_deleted + + ;
}
typedef struct DataPageDeleteStack
{
struct DataPageDeleteStack * child ;
struct DataPageDeleteStack * parent ;
BlockNumber blkno ; /* current block number */
BlockNumber leftBlkno ; /* rightest non-deleted page on left */
bool isRoot ;
} DataPageDeleteStack ;
/*
* scans posting tree and deletes empty pages
@ -325,6 +257,10 @@ ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
buffer = ReadBufferExtended ( gvs - > index , MAIN_FORKNUM , blkno ,
RBM_NORMAL , gvs - > strategy ) ;
if ( ! isRoot )
LockBuffer ( buffer , GIN_EXCLUSIVE ) ;
page = BufferGetPage ( buffer ) ;
Assert ( GinPageIsData ( page ) ) ;
@ -359,6 +295,9 @@ ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
}
}
if ( ! isRoot )
LockBuffer ( buffer , GIN_UNLOCK ) ;
ReleaseBuffer ( buffer ) ;
if ( ! meDelete )
@ -367,37 +306,124 @@ ginScanToDelete(GinVacuumState *gvs, BlockNumber blkno, bool isRoot,
return meDelete ;
}
static void
ginVacuumPostingTree ( GinVacuumState * gvs , BlockNumber rootBlkno )
/*
* Scan through posting tree , delete empty tuples from leaf pages .
* Also , this function collects empty subtrees ( with all empty leafs ) .
* For parents of these subtrees CleanUp lock is taken , then we call
* ScanToDelete . This is done for every inner page , which points to
* empty subtree .
*/
static bool
ginVacuumPostingTreeLeaves ( GinVacuumState * gvs , BlockNumber blkno , bool isRoot )
{
Buffer rootBuffer = InvalidBuffer ;
DataPageDeleteStack root ,
* ptr ,
* tmp ;
Buffer buffer ;
Page page ;
bool hasVoidPage = FALSE ;
MemoryContext oldCxt ;
buffer = ReadBufferExtended ( gvs - > index , MAIN_FORKNUM , blkno ,
RBM_NORMAL , gvs - > strategy ) ;
page = BufferGetPage ( buffer ) ;
ginTraverseLock ( buffer , false ) ;
if ( ginVacuumPostingTreeLeaves ( gvs , rootBlkno , TRUE , & rootBuffer ) = = FALSE )
Assert ( GinPageIsData ( page ) ) ;
if ( GinPageIsLeaf ( page ) )
{
Assert ( rootBuffer = = InvalidBuffer ) ;
return ;
oldCxt = MemoryContextSwitchTo ( gvs - > tmpCxt ) ;
ginVacuumPostingTreeLeaf ( gvs - > index , buffer , gvs ) ;
MemoryContextSwitchTo ( oldCxt ) ;
MemoryContextReset ( gvs - > tmpCxt ) ;
/* if root is a leaf page, we don't desire further processing */
if ( GinDataLeafPageIsEmpty ( page ) )
hasVoidPage = TRUE ;
UnlockReleaseBuffer ( buffer ) ;
return hasVoidPage ;
}
else
{
OffsetNumber i ;
bool hasEmptyChild = FALSE ;
bool hasNonEmptyChild = FALSE ;
OffsetNumber maxoff = GinPageGetOpaque ( page ) - > maxoff ;
BlockNumber * children = palloc ( sizeof ( BlockNumber ) * ( maxoff + 1 ) ) ;
/*
* Read all children BlockNumbers .
* Not sure it is safe if there are many concurrent vacuums .
*/
memset ( & root , 0 , sizeof ( DataPageDeleteStack ) ) ;
root . leftBlkno = InvalidBlockNumber ;
root . isRoot = TRUE ;
for ( i = FirstOffsetNumber ; i < = maxoff ; i + + )
{
PostingItem * pitem = GinDataPageGetPostingItem ( page , i ) ;
vacuum_delay_point ( ) ;
children [ i ] = PostingItemGetBlockNumber ( pitem ) ;
}
ginScanToDelete ( gvs , rootBlkno , TRUE , & root , InvalidOffsetNumber ) ;
UnlockReleaseBuffer ( buff er ) ;
ptr = root . child ;
while ( ptr )
{
tmp = ptr - > child ;
pfree ( ptr ) ;
ptr = tmp ;
for ( i = FirstOffsetNumber ; i < = maxoff ; i + + )
{
if ( ginVacuumPostingTreeLeaves ( gvs , children [ i ] , FALSE ) )
hasEmptyChild = TRUE ;
else
hasNonEmptyChild = TRUE ;
}
pfree ( children ) ;
vacuum_delay_point ( ) ;
/*
* All subtree is empty - just return TRUE to indicate that parent must
* do a cleanup . Unless we are ROOT an there is way to go upper .
*/
if ( hasEmptyChild & & ! hasNonEmptyChild & & ! isRoot )
return TRUE ;
if ( hasEmptyChild )
{
DataPageDeleteStack root ,
* ptr ,
* tmp ;
buffer = ReadBufferExtended ( gvs - > index , MAIN_FORKNUM , blkno ,
RBM_NORMAL , gvs - > strategy ) ;
LockBufferForCleanup ( buffer ) ;
memset ( & root , 0 , sizeof ( DataPageDeleteStack ) ) ;
root . leftBlkno = InvalidBlockNumber ;
root . isRoot = TRUE ;
ginScanToDelete ( gvs , blkno , TRUE , & root , InvalidOffsetNumber ) ;
ptr = root . child ;
while ( ptr )
{
tmp = ptr - > child ;
pfree ( ptr ) ;
ptr = tmp ;
}
UnlockReleaseBuffer ( buffer ) ;
}
/* Here we have deleted all empty subtrees */
return FALSE ;
}
}
UnlockReleaseBuffer ( rootBuffer ) ;
static void
ginVacuumPostingTree ( GinVacuumState * gvs , BlockNumber rootBlkno )
{
ginVacuumPostingTreeLeaves ( gvs , rootBlkno , TRUE ) ;
}
/*