@ -411,51 +411,250 @@ PageRestoreTempPage(Page tempPage, Page oldPage)
}
/*
* sortin g support for PageRepairFragmentation and PageIndexMultiDelete
* Tuple defra g support for PageRepairFragmentation and PageIndexMultiDelete
*/
typedef struct itemIdSor tData
typedef struct itemIdCompac tData
{
uint16 offsetindex ; /* linp array index */
int16 itemoff ; /* page offset of item data */
uint16 alignedlen ; /* MAXALIGN(item data len) */
} itemIdSortData ;
typedef itemIdSortData * itemIdSort ;
static int
itemoffcompare ( const void * itemidp1 , const void * itemidp2 )
{
/* Sort in decreasing itemoff order */
return ( ( itemIdSort ) itemidp2 ) - > itemoff -
( ( itemIdSort ) itemidp1 ) - > itemoff ;
}
} itemIdCompactData ;
typedef itemIdCompactData * itemIdCompact ;
/*
* After removing or marking some line pointers unused , move the tuples to
* remove the gaps caused by the removed items .
* remove the gaps caused by the removed items and reorder them back into
* reverse line pointer order in the page .
*
* This function can often be fairly hot , so it pays to take some measures to
* make it as optimal as possible .
*
* Callers may pass ' presorted ' as true if the ' itemidbase ' array is sorted in
* descending order of itemoff . When this is true we can just memmove ( )
* tuples towards the end of the page . This is quite a common case as it ' s
* the order that tuples are initially inserted into pages . When we call this
* function to defragment the tuples in the page then any new line pointers
* added to the page will keep that presorted order , so hitting this case is
* still very common for tables that are commonly updated .
*
* When the ' itemidbase ' array is not presorted then we ' re unable to just
* memmove ( ) tuples around freely . Doing so could cause us to overwrite the
* memory belonging to a tuple we ' ve not moved yet . In this case , we copy all
* the tuples that need to be moved into a temporary buffer . We can then
* simply memcpy ( ) out of that temp buffer back into the page at the correct
* location . Tuples are copied back into the page in the same order as the
* ' itemidbase ' array , so we end up reordering the tuples back into reverse
* line pointer order . This will increase the chances of hitting the
* presorted case the next time around .
*
* Callers must ensure that nitems is > 0
*/
static void
compactify_tuples ( itemIdSort itemidbase , int nitems , Page page )
compactify_tuples ( itemIdCompac t itemidbase , int nitems , Page page , bool presorted )
{
PageHeader phdr = ( PageHeader ) page ;
Offset upper ;
Offset copy_tail ;
Offset copy_head ;
itemIdCompact itemidptr ;
int i ;
/* sort itemIdSortData array into decreasing itemoff order */
qsort ( ( char * ) itemidbase , nitems , sizeof ( itemIdSortData ) ,
itemoffcompare ) ;
/* Code within will not work correctly if nitems == 0 */
Assert ( nitems > 0 ) ;
upper = phdr - > pd_special ;
for ( i = 0 ; i < nitems ; i + + )
if ( presorted )
{
itemIdSort itemidptr = & itemidbase [ i ] ;
ItemId lp ;
lp = PageGetItemId ( page , itemidptr - > offsetindex + 1 ) ;
upper - = itemidptr - > alignedlen ;
# ifdef USE_ASSERT_CHECKING
{
/*
* Verify we ' ve not gotten any new callers that are incorrectly
* passing a true presorted value .
*/
Offset lastoff = phdr - > pd_special ;
for ( i = 0 ; i < nitems ; i + + )
{
itemidptr = & itemidbase [ i ] ;
Assert ( lastoff > itemidptr - > itemoff ) ;
lastoff = itemidptr - > itemoff ;
}
}
# endif /* USE_ASSERT_CHECKING */
/*
* ' itemidbase ' is already in the optimal order , i . e , lower item
* pointers have a higher offset . This allows us to memmove ( ) the
* tuples up to the end of the page without having to worry about
* overwriting other tuples that have not been moved yet .
*
* There ' s a good chance that there are tuples already right at the
* end of the page that we can simply skip over because they ' re
* already in the correct location within the page . We ' ll do that
* first . . .
*/
upper = phdr - > pd_special ;
i = 0 ;
do
{
itemidptr = & itemidbase [ i ] ;
if ( upper ! = itemidptr - > itemoff + itemidptr - > alignedlen )
break ;
upper - = itemidptr - > alignedlen ;
i + + ;
} while ( i < nitems ) ;
/*
* Now that we ' ve found the first tuple that needs to be moved , we can
* do the tuple compactification . We try and make the least number of
* memmove ( ) calls and only call memmove ( ) when there ' s a gap . When
* we see a gap we just move all tuples after the gap up until the
* point of the last move operation .
*/
copy_tail = copy_head = itemidptr - > itemoff + itemidptr - > alignedlen ;
for ( ; i < nitems ; i + + )
{
ItemId lp ;
itemidptr = & itemidbase [ i ] ;
lp = PageGetItemId ( page , itemidptr - > offsetindex + 1 ) ;
if ( copy_head ! = itemidptr - > itemoff + itemidptr - > alignedlen )
{
memmove ( ( char * ) page + upper ,
page + copy_head ,
copy_tail - copy_head ) ;
/*
* We ' ve now moved all tuples already seen , but not the
* current tuple , so we set the copy_tail to the end of this
* tuple so it can be moved in another iteration of the loop .
*/
copy_tail = itemidptr - > itemoff + itemidptr - > alignedlen ;
}
/* shift the target offset down by the length of this tuple */
upper - = itemidptr - > alignedlen ;
/* point the copy_head to the start of this tuple */
copy_head = itemidptr - > itemoff ;
/* update the line pointer to reference the new offset */
lp - > lp_off = upper ;
}
/* move the remaining tuples. */
memmove ( ( char * ) page + upper ,
( char * ) page + itemidptr - > itemoff ,
itemidptr - > alignedlen ) ;
lp - > lp_off = upper ;
page + copy_head ,
copy_tail - copy_head ) ;
}
else
{
PGAlignedBlock scratch ;
char * scratchptr = scratch . data ;
/*
* Non - presorted case : The tuples in the itemidbase array may be in
* any order . So , in order to move these to the end of the page we
* must make a temp copy of each tuple that needs to be moved before
* we copy them back into the page at the new offset .
*
* If a large percentage of tuples have been pruned ( > 75 % ) then we ' ll
* copy these into the temp buffer tuple - by - tuple , otherwise , we ' ll
* just do a single memcpy ( ) for all tuples that need to be moved .
* When so many tuples have been removed there ' s likely to be a lot of
* gaps and it ' s unlikely that many non - movable tuples remain at the
* end of the page .
*/
if ( nitems < PageGetMaxOffsetNumber ( page ) / 4 )
{
i = 0 ;
do
{
itemidptr = & itemidbase [ i ] ;
memcpy ( scratchptr + itemidptr - > itemoff , page + itemidptr - > itemoff ,
itemidptr - > alignedlen ) ;
i + + ;
} while ( i < nitems ) ;
/* Set things up for the compactification code below */
i = 0 ;
itemidptr = & itemidbase [ 0 ] ;
upper = phdr - > pd_special ;
}
else
{
upper = phdr - > pd_special ;
/*
* Many tuples are likely to already be in the correct location .
* There ' s no need to copy these into the temp buffer . Instead
* we ' ll just skip forward in the itemidbase array to the position
* that we do need to move tuples from so that the code below just
* leaves these ones alone .
*/
i = 0 ;
do
{
itemidptr = & itemidbase [ i ] ;
if ( upper ! = itemidptr - > itemoff + itemidptr - > alignedlen )
break ;
upper - = itemidptr - > alignedlen ;
i + + ;
} while ( i < nitems ) ;
/* Copy all tuples that need to be moved into the temp buffer */
memcpy ( scratchptr + phdr - > pd_upper ,
page + phdr - > pd_upper ,
upper - phdr - > pd_upper ) ;
}
/*
* Do the tuple compactification . itemidptr is already pointing to
* the first tuple that we ' re going to move . Here we collapse the
* memcpy calls for adjacent tuples into a single call . This is done
* by delaying the memcpy call until we find a gap that needs to be
* closed .
*/
copy_tail = copy_head = itemidptr - > itemoff + itemidptr - > alignedlen ;
for ( ; i < nitems ; i + + )
{
ItemId lp ;
itemidptr = & itemidbase [ i ] ;
lp = PageGetItemId ( page , itemidptr - > offsetindex + 1 ) ;
/* copy pending tuples when we detect a gap */
if ( copy_head ! = itemidptr - > itemoff + itemidptr - > alignedlen )
{
memcpy ( ( char * ) page + upper ,
scratchptr + copy_head ,
copy_tail - copy_head ) ;
/*
* We ' ve now copied all tuples already seen , but not the
* current tuple , so we set the copy_tail to the end of this
* tuple .
*/
copy_tail = itemidptr - > itemoff + itemidptr - > alignedlen ;
}
/* shift the target offset down by the length of this tuple */
upper - = itemidptr - > alignedlen ;
/* point the copy_head to the start of this tuple */
copy_head = itemidptr - > itemoff ;
/* update the line pointer to reference the new offset */
lp - > lp_off = upper ;
}
/* Copy the remaining chunk */
memcpy ( ( char * ) page + upper ,
scratchptr + copy_head ,
copy_tail - copy_head ) ;
}
phdr - > pd_upper = upper ;
@ -477,14 +676,16 @@ PageRepairFragmentation(Page page)
Offset pd_lower = ( ( PageHeader ) page ) - > pd_lower ;
Offset pd_upper = ( ( PageHeader ) page ) - > pd_upper ;
Offset pd_special = ( ( PageHeader ) page ) - > pd_special ;
itemIdSortData itemidbase [ MaxHeapTuplesPerPage ] ;
itemIdSort itemidptr ;
Offset last_offset ;
itemIdCompactData itemidbase [ MaxHeapTuplesPerPage ] ;
itemIdCompact itemidptr ;
ItemId lp ;
int nline ,
nstorage ,
nunused ;
int i ;
Size totallen ;
bool presorted = true ; /* For now */
/*
* It ' s worth the trouble to be more paranoid here than in most places ,
@ -509,6 +710,7 @@ PageRepairFragmentation(Page page)
nline = PageGetMaxOffsetNumber ( page ) ;
itemidptr = itemidbase ;
nunused = totallen = 0 ;
last_offset = pd_special ;
for ( i = FirstOffsetNumber ; i < = nline ; i + + )
{
lp = PageGetItemId ( page , i ) ;
@ -518,6 +720,12 @@ PageRepairFragmentation(Page page)
{
itemidptr - > offsetindex = i - 1 ;
itemidptr - > itemoff = ItemIdGetOffset ( lp ) ;
if ( last_offset > itemidptr - > itemoff )
last_offset = itemidptr - > itemoff ;
else
presorted = false ;
if ( unlikely ( itemidptr - > itemoff < ( int ) pd_upper | |
itemidptr - > itemoff > = ( int ) pd_special ) )
ereport ( ERROR ,
@ -552,7 +760,7 @@ PageRepairFragmentation(Page page)
errmsg ( " corrupted item lengths: total %u, available space %u " ,
( unsigned int ) totallen , pd_special - pd_lower ) ) ) ;
compactify_tuples ( itemidbase , nstorage , page ) ;
compactify_tuples ( itemidbase , nstorage , page , presorted ) ;
}
/* Set hint bit for PageAddItem */
@ -831,9 +1039,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
Offset pd_lower = phdr - > pd_lower ;
Offset pd_upper = phdr - > pd_upper ;
Offset pd_special = phdr - > pd_special ;
itemIdSortData itemidbase [ MaxIndexTuplesPerPage ] ;
Offset last_offset ;
itemIdCompactData itemidbase [ MaxIndexTuplesPerPage ] ;
ItemIdData newitemids [ MaxIndexTuplesPerPage ] ;
itemIdSort itemidptr ;
itemIdCompact itemidptr ;
ItemId lp ;
int nline ,
nused ;
@ -842,6 +1051,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
unsigned offset ;
int nextitm ;
OffsetNumber offnum ;
bool presorted = true ; /* For now */
Assert ( nitems < = MaxIndexTuplesPerPage ) ;
@ -883,6 +1093,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
totallen = 0 ;
nused = 0 ;
nextitm = 0 ;
last_offset = pd_special ;
for ( offnum = FirstOffsetNumber ; offnum < = nline ; offnum = OffsetNumberNext ( offnum ) )
{
lp = PageGetItemId ( page , offnum ) ;
@ -906,6 +1117,12 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
{
itemidptr - > offsetindex = nused ; /* where it will go */
itemidptr - > itemoff = offset ;
if ( last_offset > itemidptr - > itemoff )
last_offset = itemidptr - > itemoff ;
else
presorted = false ;
itemidptr - > alignedlen = MAXALIGN ( size ) ;
totallen + = itemidptr - > alignedlen ;
newitemids [ nused ] = * lp ;
@ -932,7 +1149,10 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
phdr - > pd_lower = SizeOfPageHeaderData + nused * sizeof ( ItemIdData ) ;
/* and compactify the tuple data */
compactify_tuples ( itemidbase , nused , page ) ;
if ( nused > 0 )
compactify_tuples ( itemidbase , nused , page , presorted ) ;
else
phdr - > pd_upper = pd_special ;
}