@ -200,7 +200,7 @@ saveNodeLink(Relation index, SPPageDesc *parent,
*/
static void
addLeafTuple ( Relation index , SpGistState * state , SpGistLeafTuple leafTuple ,
SPPageDesc * current , SPPageDesc * parent , bool isNew )
SPPageDesc * current , SPPageDesc * parent , bool isNulls , bool isN ew )
{
XLogRecData rdata [ 4 ] ;
spgxlogAddLeaf xlrec ;
@ -208,6 +208,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
xlrec . node = index - > rd_node ;
xlrec . blknoLeaf = current - > blkno ;
xlrec . newPage = isNew ;
xlrec . storesNulls = isNulls ;
/* these will be filled below as needed */
xlrec . offnumLeaf = InvalidOffsetNumber ;
@ -224,7 +225,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
START_CRIT_SECTION ( ) ;
if ( current - > offnum = = InvalidOffsetNumber | |
current - > blkno = = SPGIST_HEAD_BLKNO )
SpGistBlockIsRoot ( current - > blkno ) )
{
/* Tuple is not part of a chain */
leafTuple - > nextOffset = InvalidOffsetNumber ;
@ -337,7 +338,7 @@ checkSplitConditions(Relation index, SpGistState *state,
n = 0 ,
totalSize = 0 ;
if ( current - > blkno = = SPGIST_HEAD_BLKNO )
if ( SpGistBlockIsRoot ( current - > blkno ) )
{
/* return impossible values to force split */
* nToSplit = BLCKSZ ;
@ -386,7 +387,7 @@ checkSplitConditions(Relation index, SpGistState *state,
static void
moveLeafs ( Relation index , SpGistState * state ,
SPPageDesc * current , SPPageDesc * parent ,
SpGistLeafTuple newLeafTuple )
SpGistLeafTuple newLeafTuple , bool isNulls )
{
int i ,
nDelete ,
@ -451,7 +452,8 @@ moveLeafs(Relation index, SpGistState *state,
}
/* Find a leaf page that will hold them */
nbuf = SpGistGetBuffer ( index , GBUF_LEAF , size , & xlrec . newPage ) ;
nbuf = SpGistGetBuffer ( index , GBUF_LEAF | ( isNulls ? GBUF_NULLS : 0 ) ,
size , & xlrec . newPage ) ;
npage = BufferGetPage ( nbuf ) ;
nblkno = BufferGetBlockNumber ( nbuf ) ;
Assert ( nblkno ! = current - > blkno ) ;
@ -464,6 +466,7 @@ moveLeafs(Relation index, SpGistState *state,
xlrec . blknoDst = nblkno ;
xlrec . nMoves = nDelete ;
xlrec . replaceDead = replaceDead ;
xlrec . storesNulls = isNulls ;
xlrec . blknoParent = parent - > blkno ;
xlrec . offnumParent = parent - > offnum ;
@ -584,6 +587,8 @@ setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
* If so , randomly divide the tuples into several nodes ( all with the same
* label ) and return TRUE to select allTheSame mode for this inner tuple .
*
* ( This code is also used to forcibly select allTheSame mode for nulls . )
*
* If we know that the leaf tuples wouldn ' t all fit on one page , then we
* exclude the last tuple ( which is the incoming new tuple that forced a split )
* from the check to see if more than one node is used . The reason for this
@ -674,7 +679,8 @@ checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
static bool
doPickSplit ( Relation index , SpGistState * state ,
SPPageDesc * current , SPPageDesc * parent ,
SpGistLeafTuple newLeafTuple , int level , bool isNew )
SpGistLeafTuple newLeafTuple ,
int level , bool isNulls , bool isNew )
{
bool insertedNew = false ;
spgPickSplitIn in ;
@ -733,11 +739,18 @@ doPickSplit(Relation index, SpGistState *state,
* also , count up the amount of space that will be freed from current .
* ( Note that in the non - root case , we won ' t actually delete the old
* tuples , only replace them with redirects or placeholders . )
*
* Note : the SGLTDATUM calls here are safe even when dealing with a nulls
* page . For a pass - by - value data type we will fetch a word that must
* exist even though it may contain garbage ( because of the fact that leaf
* tuples must have size at least SGDTSIZE ) . For a pass - by - reference type
* we are just computing a pointer that isn ' t going to get dereferenced .
* So it ' s not worth guarding the calls with isNulls checks .
*/
nToInsert = 0 ;
nToDelete = 0 ;
spaceToDelete = 0 ;
if ( current - > blkno = = SPGIST_HEAD_BLKNO )
if ( SpGistBlockIsRoot ( current - > blkno ) )
{
/*
* We are splitting the root ( which up to now is also a leaf page ) .
@ -813,26 +826,53 @@ doPickSplit(Relation index, SpGistState *state,
heapPtrs [ in . nTuples ] = newLeafTuple - > heapPtr ;
in . nTuples + + ;
/*
* Perform split using user - defined method .
*/
memset ( & out , 0 , sizeof ( out ) ) ;
procinfo = index_getprocinfo ( index , 1 , SPGIST_PICKSPLIT_PROC ) ;
FunctionCall2Coll ( procinfo ,
index - > rd_indcollation [ 0 ] ,
PointerGetDatum ( & in ) ,
PointerGetDatum ( & out ) ) ;
if ( ! isNulls )
{
/*
* Perform split using user - defined method .
*/
procinfo = index_getprocinfo ( index , 1 , SPGIST_PICKSPLIT_PROC ) ;
FunctionCall2Coll ( procinfo ,
index - > rd_indcollation [ 0 ] ,
PointerGetDatum ( & in ) ,
PointerGetDatum ( & out ) ) ;
/*
* Form new leaf tuples and count up the total space needed .
*/
totalLeafSizes = 0 ;
for ( i = 0 ; i < in . nTuples ; i + + )
/*
* Form new leaf tuples and count up the total space needed .
*/
totalLeafSizes = 0 ;
for ( i = 0 ; i < in . nTuples ; i + + )
{
newLeafs [ i ] = spgFormLeafTuple ( state , heapPtrs + i ,
out . leafTupleDatums [ i ] ,
false ) ;
totalLeafSizes + = newLeafs [ i ] - > size + sizeof ( ItemIdData ) ;
}
}
else
{
newLeafs [ i ] = spgFormLeafTuple ( state , heapPtrs + i ,
out . leafTupleDatums [ i ] ) ;
totalLeafSizes + = newLeafs [ i ] - > size + sizeof ( ItemIdData ) ;
/*
* Perform dummy split that puts all tuples into one node .
* checkAllTheSame will override this and force allTheSame mode .
*/
out . hasPrefix = false ;
out . nNodes = 1 ;
out . nodeLabels = NULL ;
out . mapTuplesToNodes = palloc0 ( sizeof ( int ) * in . nTuples ) ;
/*
* Form new leaf tuples and count up the total space needed .
*/
totalLeafSizes = 0 ;
for ( i = 0 ; i < in . nTuples ; i + + )
{
newLeafs [ i ] = spgFormLeafTuple ( state , heapPtrs + i ,
( Datum ) 0 ,
true ) ;
totalLeafSizes + = newLeafs [ i ] - > size + sizeof ( ItemIdData ) ;
}
}
/*
@ -872,11 +912,11 @@ doPickSplit(Relation index, SpGistState *state,
for ( i = 0 ; i < out . nNodes ; i + + )
{
Datum label = ( Datum ) 0 ;
bool isnull = ( out . nodeLabels = = NULL ) ;
bool label isnull = ( out . nodeLabels = = NULL ) ;
if ( ! isnull )
if ( ! label isnull)
label = out . nodeLabels [ i ] ;
nodes [ i ] = spgFormNodeTuple ( state , label , isnull ) ;
nodes [ i ] = spgFormNodeTuple ( state , label , label isnull) ;
}
innerTuple = spgFormInnerTuple ( state ,
out . hasPrefix , out . prefixDatum ,
@ -914,7 +954,7 @@ doPickSplit(Relation index, SpGistState *state,
*/
xlrec . initInner = false ;
if ( parent - > buffer ! = InvalidBuffer & &
parent - > blkno ! = SPGIST_HEAD_BLKNO & &
! SpGistBlockIsRoot ( parent - > blkno ) & &
( SpGistPageGetFreeSpace ( parent - > page , 1 ) > =
innerTuple - > size + sizeof ( ItemIdData ) ) )
{
@ -925,7 +965,8 @@ doPickSplit(Relation index, SpGistState *state,
{
/* Send tuple to page with next triple parity (see README) */
newInnerBuffer = SpGistGetBuffer ( index ,
GBUF_INNER_PARITY ( parent - > blkno + 1 ) ,
GBUF_INNER_PARITY ( parent - > blkno + 1 ) |
( isNulls ? GBUF_NULLS : 0 ) ,
innerTuple - > size + sizeof ( ItemIdData ) ,
& xlrec . initInner ) ;
}
@ -935,7 +976,7 @@ doPickSplit(Relation index, SpGistState *state,
newInnerBuffer = InvalidBuffer ;
}
/*----------
/*
* Because a WAL record can ' t involve more than four buffers , we can
* only afford to deal with two leaf pages in each picksplit action ,
* ie the current page and at most one other .
@ -956,9 +997,8 @@ doPickSplit(Relation index, SpGistState *state,
* If we are splitting the root page ( turning it from a leaf page into an
* inner page ) , then no leaf tuples can go back to the current page ; they
* must all go somewhere else .
* - - - - - - - - - -
*/
if ( current - > blkno ! = SPGIST_HEAD_BLKNO )
if ( ! SpGistBlockIsRoot ( current - > blkno ) )
currentFreeSpace = PageGetExactFreeSpace ( current - > page ) + spaceToDelete ;
else
currentFreeSpace = 0 ; /* prevent assigning any tuples to current */
@ -996,7 +1036,8 @@ doPickSplit(Relation index, SpGistState *state,
int curspace ;
int newspace ;
newLeafBuffer = SpGistGetBuffer ( index , GBUF_LEAF ,
newLeafBuffer = SpGistGetBuffer ( index ,
GBUF_LEAF | ( isNulls ? GBUF_NULLS : 0 ) ,
Min ( totalLeafSizes ,
SPGIST_PAGE_CAPACITY ) ,
& xlrec . initDest ) ;
@ -1076,6 +1117,7 @@ doPickSplit(Relation index, SpGistState *state,
xlrec . blknoDest = InvalidBlockNumber ;
xlrec . nDelete = 0 ;
xlrec . initSrc = isNew ;
xlrec . storesNulls = isNulls ;
leafdata = leafptr = ( char * ) palloc ( totalLeafSizes ) ;
@ -1091,7 +1133,7 @@ doPickSplit(Relation index, SpGistState *state,
* the root ; in that case there ' s no need because we ' ll re - init the page
* below . We do this first to make room for reinserting new leaf tuples .
*/
if ( current - > blkno ! = SPGIST_HEAD_BLKNO )
if ( ! SpGistBlockIsRoot ( current - > blkno ) )
{
/*
* Init buffer instead of deleting individual tuples , but only if
@ -1102,7 +1144,8 @@ doPickSplit(Relation index, SpGistState *state,
nToDelete + SpGistPageGetOpaque ( current - > page ) - > nPlaceholder = =
PageGetMaxOffsetNumber ( current - > page ) )
{
SpGistInitBuffer ( current - > buffer , SPGIST_LEAF ) ;
SpGistInitBuffer ( current - > buffer ,
SPGIST_LEAF | ( isNulls ? SPGIST_NULLS : 0 ) ) ;
xlrec . initSrc = true ;
}
else if ( isNew )
@ -1317,10 +1360,10 @@ doPickSplit(Relation index, SpGistState *state,
* Splitting root page , which was a leaf but now becomes inner page
* ( and so " current " continues to point at it )
*/
Assert ( current - > blkno = = SPGIST_HEAD_BLKNO ) ;
Assert ( SpGistBlockIsRoot ( current - > blkno ) ) ;
Assert ( redirectTuplePos = = InvalidOffsetNumber ) ;
SpGistInitBuffer ( current - > buffer , 0 ) ;
SpGistInitBuffer ( current - > buffer , ( isNulls ? SPGIST_NULLS : 0 ) ) ;
xlrec . initInner = true ;
xlrec . blknoInner = current - > blkno ;
@ -1461,6 +1504,9 @@ spgAddNodeAction(Relation index, SpGistState *state,
XLogRecData rdata [ 5 ] ;
spgxlogAddNode xlrec ;
/* Should not be applied to nulls */
Assert ( ! SpGistPageStoresNulls ( current - > page ) ) ;
/* Construct new inner tuple with additional node */
newInnerTuple = addNode ( state , innerTuple , nodeLabel , nodeN ) ;
@ -1527,7 +1573,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
* allow only one inner tuple on the root page , and spgFormInnerTuple
* always checks that inner tuples don ' t exceed the size of a page .
*/
if ( current - > blkno = = SPGIST_HEAD_BLKNO )
if ( SpGistBlockIsRoot ( current - > blkno ) )
elog ( ERROR , " cannot enlarge root tuple any more " ) ;
Assert ( parent - > buffer ! = InvalidBuffer ) ;
@ -1657,6 +1703,9 @@ spgSplitNodeAction(Relation index, SpGistState *state,
spgxlogSplitTuple xlrec ;
Buffer newBuffer = InvalidBuffer ;
/* Should not be applied to nulls */
Assert ( ! SpGistPageStoresNulls ( current - > page ) ) ;
/*
* Construct new prefix tuple , containing a single node with the
* specified label . ( We ' ll update the node ' s downlink to point to the
@ -1709,7 +1758,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
* For the space calculation , note that prefixTuple replaces innerTuple
* but postfixTuple will be a new entry .
*/
if ( current - > blkno = = SPGIST_HEAD_BLKNO | |
if ( SpGistBlockIsRoot ( current - > blkno ) | |
SpGistPageGetFreeSpace ( current - > page , 1 ) + innerTuple - > size <
prefixTuple - > size + postfixTuple - > size + sizeof ( ItemIdData ) )
{
@ -1804,7 +1853,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
*/
void
spgdoinsert ( Relation index , SpGistState * state ,
ItemPointer heapPtr , Datum datum )
ItemPointer heapPtr , Datum datum , bool isnull )
{
int level = 0 ;
Datum leafDatum ;
@ -1817,7 +1866,7 @@ spgdoinsert(Relation index, SpGistState *state,
* value to be inserted is not toasted ; FormIndexDatum doesn ' t guarantee
* that .
*/
if ( state - > attType . attlen = = - 1 )
if ( ! isnull & & state - > attType . attlen = = - 1 )
datum = PointerGetDatum ( PG_DETOAST_DATUM ( datum ) ) ;
leafDatum = datum ;
@ -1828,8 +1877,11 @@ spgdoinsert(Relation index, SpGistState *state,
* If it isn ' t gonna fit , and the opclass can ' t reduce the datum size by
* suffixing , bail out now rather than getting into an endless loop .
*/
leafSize = SGLTHDRSZ + sizeof ( ItemIdData ) +
SpGistGetTypeSize ( & state - > attType , leafDatum ) ;
if ( ! isnull )
leafSize = SGLTHDRSZ + sizeof ( ItemIdData ) +
SpGistGetTypeSize ( & state - > attType , leafDatum ) ;
else
leafSize = SGDTSIZE + sizeof ( ItemIdData ) ;
if ( leafSize > SPGIST_PAGE_CAPACITY & & ! state - > config . longValuesOK )
ereport ( ERROR ,
@ -1840,8 +1892,8 @@ spgdoinsert(Relation index, SpGistState *state,
RelationGetRelationName ( index ) ) ,
errhint ( " Values larger than a buffer page cannot be indexed. " ) ) ) ;
/* Initialize "current" to the root page */
current . blkno = SPGIST_HEAD _BLKNO;
/* Initialize "current" to the appropriate root page */
current . blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT _BLKNO;
current . buffer = InvalidBuffer ;
current . page = NULL ;
current . offnum = FirstOffsetNumber ;
@ -1873,10 +1925,11 @@ spgdoinsert(Relation index, SpGistState *state,
* for doPickSplit to always have a leaf page at hand ; so just
* quietly limit our request to a page size .
*/
current . buffer = SpGistGetBuffer ( index , GBUF_LEAF ,
Min ( leafSize ,
SPGIST_PAGE_CAPACITY ) ,
& isNew ) ;
current . buffer =
SpGistGetBuffer ( index ,
GBUF_LEAF | ( isnull ? GBUF_NULLS : 0 ) ,
Min ( leafSize , SPGIST_PAGE_CAPACITY ) ,
& isNew ) ;
current . blkno = BufferGetBlockNumber ( current . buffer ) ;
}
else if ( parent . buffer = = InvalidBuffer | |
@ -1892,19 +1945,25 @@ spgdoinsert(Relation index, SpGistState *state,
}
current . page = BufferGetPage ( current . buffer ) ;
/* should not arrive at a page of the wrong type */
if ( isnull ? ! SpGistPageStoresNulls ( current . page ) :
SpGistPageStoresNulls ( current . page ) )
elog ( ERROR , " SPGiST index page %u has wrong nulls flag " ,
current . blkno ) ;
if ( SpGistPageIsLeaf ( current . page ) )
{
SpGistLeafTuple leafTuple ;
int nToSplit ,
sizeToSplit ;
leafTuple = spgFormLeafTuple ( state , heapPtr , leafDatum ) ;
leafTuple = spgFormLeafTuple ( state , heapPtr , leafDatum , isnull ) ;
if ( leafTuple - > size + sizeof ( ItemIdData ) < =
SpGistPageGetFreeSpace ( current . page , 1 ) )
{
/* it fits on page, so insert it and we're done */
addLeafTuple ( index , state , leafTuple ,
& current , & parent , isNew ) ;
& current , & parent , isnull , is New ) ;
break ;
}
else if ( ( sizeToSplit =
@ -1918,14 +1977,14 @@ spgdoinsert(Relation index, SpGistState *state,
* chain to another leaf page rather than splitting it .
*/
Assert ( ! isNew ) ;
moveLeafs ( index , state , & current , & parent , leafTuple ) ;
moveLeafs ( index , state , & current , & parent , leafTuple , isnull ) ;
break ; /* we're done */
}
else
{
/* picksplit */
if ( doPickSplit ( index , state , & current , & parent ,
leafTuple , level , isNew ) )
leafTuple , level , isnull , is New ) )
break ; /* doPickSplit installed new tuples */
/* leaf tuple will not be inserted yet */
@ -1972,11 +2031,20 @@ spgdoinsert(Relation index, SpGistState *state,
memset ( & out , 0 , sizeof ( out ) ) ;
procinfo = index_getprocinfo ( index , 1 , SPGIST_CHOOSE_PROC ) ;
FunctionCall2Coll ( procinfo ,
index - > rd_indcollation [ 0 ] ,
PointerGetDatum ( & in ) ,
PointerGetDatum ( & out ) ) ;
if ( ! isnull )
{
/* use user-defined choose method */
procinfo = index_getprocinfo ( index , 1 , SPGIST_CHOOSE_PROC ) ;
FunctionCall2Coll ( procinfo ,
index - > rd_indcollation [ 0 ] ,
PointerGetDatum ( & in ) ,
PointerGetDatum ( & out ) ) ;
}
else
{
/* force "match" action (to insert to random subnode) */
out . resultType = spgMatchNode ;
}
if ( innerTuple - > allTheSame )
{
@ -2001,9 +2069,12 @@ spgdoinsert(Relation index, SpGistState *state,
/* Adjust level as per opclass request */
level + = out . result . matchNode . levelAdd ;
/* Replace leafDatum and recompute leafSize */
leafDatum = out . result . matchNode . restDatum ;
leafSize = SGLTHDRSZ + sizeof ( ItemIdData ) +
SpGistGetTypeSize ( & state - > attType , leafDatum ) ;
if ( ! isnull )
{
leafDatum = out . result . matchNode . restDatum ;
leafSize = SGLTHDRSZ + sizeof ( ItemIdData ) +
SpGistGetTypeSize ( & state - > attType , leafDatum ) ;
}
/*
* Loop around and attempt to insert the new leafDatum