@ -8,7 +8,7 @@
* Portions Copyright ( c ) 1994 , Regents of the University of California
*
* IDENTIFICATION
* $ PostgreSQL : pgsql / src / backend / access / nbtree / nbtxlog . c , v 1.41 2007 / 02 / 01 19 : 10 : 25 momjian Exp $
* $ PostgreSQL : pgsql / src / backend / access / nbtree / nbtxlog . c , v 1.42 2007 / 02 / 08 05 : 05 : 53 momjian Exp $
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -264,122 +264,165 @@ btree_xlog_split(bool onleft, bool isroot,
{
xl_btree_split * xlrec = ( xl_btree_split * ) XLogRecGetData ( record ) ;
Relation reln ;
BlockNumber targetblk ;
OffsetNumber targetoff ;
BlockNumber leftsib ;
BlockNumber rightsib ;
BlockNumber downlink = 0 ;
Buffer buffer ;
Page page ;
BTPageOpaque pageop ;
Buffer lbuf , rbuf ;
Page lpage , rpage ;
BTPageOpaque ropaque , lopaque ;
char * datapos ;
int datalen ;
bool bkp_left = record - > xl_info & XLR_BKP_BLOCK_1 ;
bool bkp_nextsib = record - > xl_info & XLR_BKP_BLOCK_2 ;
OffsetNumber newitemoff ;
Item newitem = NULL ;
Size newitemsz = 0 ;
reln = XLogOpenRelation ( xlrec - > target . node ) ;
targetblk = ItemPointerGetBlockNumber ( & ( xlrec - > target . tid ) ) ;
targetoff = ItemPointerGetOffsetNumber ( & ( xlrec - > target . tid ) ) ;
leftsib = ( onleft ) ? targetblk : xlrec - > otherblk ;
rightsib = ( onleft ) ? xlrec - > otherblk : targetblk ;
reln = XLogOpenRelation ( xlrec - > node ) ;
/* Left (original) sibling */
buffer = XLogReadBuffer ( reln , leftsib , true ) ;
Assert ( BufferIsValid ( buffer ) ) ;
page = ( Page ) BufferGetPage ( buffer ) ;
datapos = ( char * ) xlrec + SizeOfBtreeSplit ;
datalen = record - > xl_len - SizeOfBtreeSplit ;
_bt_pageinit ( page , BufferGetPageSize ( buffer ) ) ;
pageop = ( BTPageOpaque ) PageGetSpecialPointer ( page ) ;
/* Forget any split this insertion completes */
if ( xlrec - > level > 0 )
{
BlockNumber downlink = BlockIdGetBlockNumber ( ( BlockId ) datapos ) ;
pageop - > btpo_prev = xlrec - > leftblk ;
pageop - > btpo_next = rightsib ;
pageop - > btpo . level = xlrec - > level ;
pageop - > btpo_flags = ( xlrec - > level = = 0 ) ? BTP_LEAF : 0 ;
pageop - > btpo_cycleid = 0 ;
datapos + = sizeof ( BlockIdData ) ;
datalen - = sizeof ( BlockIdData ) ;
forget_matching_split ( xlrec - > node , downlink , false ) ;
}
_bt_restore_page ( page ,
( char * ) xlrec + SizeOfBtreeSplit ,
xlrec - > leftlen ) ;
if ( onleft & & xlrec - > level > 0 )
/* Extract newitem and newitemoff */
if ( ! bkp_left & & onleft )
{
IndexTuple itup ;
IndexTupleData itupdata ;
/* extract downlink in the target tuple */
itup = ( IndexTuple ) PageGetItem ( page , PageGetItemId ( page , targetoff ) ) ;
downlink = ItemPointerGetBlockNumber ( & ( itup - > t_tid ) ) ;
Assert ( ItemPointerGetOffsetNumber ( & ( itup - > t_tid ) ) = = P_HIKEY ) ;
/* Extract the offset of the new tuple and it's contents */
memcpy ( & newitemoff , datapos , sizeof ( OffsetNumber ) ) ;
datapos + = sizeof ( OffsetNumber ) ;
datalen - = sizeof ( OffsetNumber ) ;
newitem = datapos ;
/* Need to copy tuple header due to alignment considerations */
memcpy ( & itupdata , datapos , sizeof ( IndexTupleData ) ) ;
newitemsz = IndexTupleDSize ( itupdata ) ;
newitemsz = MAXALIGN ( newitemsz ) ;
datapos + = newitemsz ;
datalen - = newitemsz ;
}
PageSetLSN ( page , lsn ) ;
PageSetTLI ( page , ThisTimeLineID ) ;
MarkBufferDirty ( buffer ) ;
UnlockReleaseBuffer ( buffer ) ;
/* Reconstruct right (new) sibling */
rbuf = XLogReadBuffer ( reln , xlrec - > rightsib , true ) ;
Assert ( BufferIsValid ( rbuf ) ) ;
rpage = ( Page ) BufferGetPage ( rbuf ) ;
/* Right (new) sibling */
buffer = XLogReadBuffer ( reln , rightsib , true ) ;
Assert ( BufferIsValid ( buffer ) ) ;
page = ( Page ) BufferGetPage ( buffer ) ;
_bt_pageinit ( rpage , BufferGetPageSize ( rbuf ) ) ;
ropaque = ( BTPageOpaque ) PageGetSpecialPointer ( rpage ) ;
_bt_pageinit ( page , BufferGetPageSize ( buffer ) ) ;
pageop = ( BTPageOpaque ) PageGetSpecialPointer ( page ) ;
ropaque - > btpo_prev = xlrec - > leftsib ;
ropaque - > btpo_next = xlrec - > rnext ;
ropaque - > btpo . level = xlrec - > level ;
ropaque - > btpo_flags = ( xlrec - > level = = 0 ) ? BTP_LEAF : 0 ;
ropaque - > btpo_cycleid = 0 ;
pageop - > btpo_prev = leftsib ;
pageop - > btpo_next = xlrec - > rightblk ;
pageop - > btpo . level = xlrec - > level ;
pageop - > btpo_flags = ( xlrec - > level = = 0 ) ? BTP_LEAF : 0 ;
pageop - > btpo_cycleid = 0 ;
_bt_restore_page ( rpage , datapos , datalen ) ;
_bt_restore_page ( page ,
( char * ) xlrec + SizeOfBtreeSplit + xlrec - > leftlen ,
record - > xl_len - SizeOfBtreeSplit - xlrec - > leftlen ) ;
PageSetLSN ( rpage , lsn ) ;
PageSetTLI ( rpage , ThisTimeLineID ) ;
MarkBufferDirty ( rbuf ) ;
if ( ! onleft & & xlrec - > level > 0 )
{
IndexTuple itup ;
/* don't release the buffer yet, because reconstructing the left sibling
* needs to access the data on the right page
*/
/* extract downlink in the target tuple */
itup = ( IndexTuple ) PageGetItem ( page , PageGetItemId ( page , targetoff ) ) ;
downlink = ItemPointerGetBlockNumber ( & ( itup - > t_tid ) ) ;
Assert ( ItemPointerGetOffsetNumber ( & ( itup - > t_tid ) ) = = P_HIKEY ) ;
}
PageSetLSN ( page , lsn ) ;
PageSetTLI ( page , ThisTimeLineID ) ;
MarkBufferDirty ( buffer ) ;
UnlockReleaseBuffer ( buffer ) ;
/* Reconstruct left (original) sibling */
/* Fix left-link of right (next) page */
if ( ! ( record - > xl_info & XLR_BKP_BLOCK_1 ) )
if ( ! bkp_left )
{
if ( xlrec - > rightblk ! = P_NONE )
lbuf = XLogReadBuffer ( reln , xlrec - > leftsib , false ) ;
if ( BufferIsValid ( lbuf ) )
{
buffer = XLogReadBuffer ( reln , xlrec - > rightblk , false ) ;
if ( BufferIsValid ( buffer ) )
{
page = ( Page ) BufferGetPage ( buffer ) ;
lpage = ( Page ) BufferGetPage ( lbuf ) ;
lopaque = ( BTPageOpaque ) PageGetSpecialPointer ( lpage ) ;
if ( XLByteLE ( lsn , PageGetLSN ( page ) ) )
if ( ! XLByteLE ( lsn , PageGetLSN ( lpage ) ) )
{
/* Remove the items from the left page that were copied to
* right page , and add the new item if it was inserted to
* left page .
*/
OffsetNumber off ;
OffsetNumber maxoff = PageGetMaxOffsetNumber ( lpage ) ;
ItemId hiItemId ;
Item hiItem ;
for ( off = maxoff ; off > = xlrec - > firstright ; off - - )
PageIndexTupleDelete ( lpage , off ) ;
if ( onleft )
{
UnlockReleaseBuffer ( buffer ) ;
if ( PageAddItem ( lpage , newitem , newitemsz , newitemoff ,
LP_USED ) = = InvalidOffsetNumber )
elog ( PANIC , " can't add new item to left sibling after split " ) ;
}
else
{
pageop = ( BTPageOpaque ) PageGetSpecialPointer ( page ) ;
pageop - > btpo_prev = rightsib ;
/* Set high key equal to the first key on the right page */
hiItemId = PageGetItemId ( rpage , P_FIRSTDATAKEY ( ropaque ) ) ;
hiItem = PageGetItem ( rpage , hiItemId ) ;
PageSetLSN ( page , lsn ) ;
PageSetTLI ( page , ThisTimeLineID ) ;
MarkBufferDirty ( buffer ) ;
UnlockReleaseBuffer ( buffer ) ;
if ( ! P_RIGHTMOST ( lopaque ) )
{
/* but remove the old high key first */
PageIndexTupleDelete ( lpage , P_HIKEY ) ;
}
if ( PageAddItem ( lpage , hiItem , ItemIdGetLength ( hiItemId ) ,
P_HIKEY , LP_USED ) = = InvalidOffsetNumber )
elog ( PANIC , " can't add high key after split to left page " ) ;
/* Fix opaque fields */
lopaque - > btpo_flags = ( xlrec - > level = = 0 ) ? BTP_LEAF : 0 ;
lopaque - > btpo_next = xlrec - > rightsib ;
lopaque - > btpo_cycleid = 0 ;
PageSetLSN ( lpage , lsn ) ;
PageSetTLI ( lpage , ThisTimeLineID ) ;
MarkBufferDirty ( lbuf ) ;
}
UnlockReleaseBuffer ( lbuf ) ;
}
}
/* Forget any split this insertion completes */
if ( xlrec - > level > 0 )
forget_matching_split ( xlrec - > target . node , downlink , false ) ;
/* we no longer need the right buffer. */
UnlockReleaseBuffer ( rbuf ) ;
/* Fix left-link of the page to the right of the new right sibling */
if ( ! bkp_nextsib & & xlrec - > rnext ! = P_NONE )
{
Buffer buffer = XLogReadBuffer ( reln , xlrec - > rnext , false ) ;
if ( BufferIsValid ( buffer ) )
{
Page page = ( Page ) BufferGetPage ( buffer ) ;
if ( ! XLByteLE ( lsn , PageGetLSN ( page ) ) )
{
BTPageOpaque pageop = ( BTPageOpaque ) PageGetSpecialPointer ( page ) ;
pageop - > btpo_prev = xlrec - > rightsib ;
PageSetLSN ( page , lsn ) ;
PageSetTLI ( page , ThisTimeLineID ) ;
MarkBufferDirty ( buffer ) ;
}
UnlockReleaseBuffer ( buffer ) ;
}
}
/* The job ain't done till the parent link is inserted... */
log_incomplete_split ( xlrec - > target . node ,
leftsib , rightsib , isroot ) ;
log_incomplete_split ( xlrec - > node ,
xlrec - > leftsib , xlrec - > rightsib , isroot ) ;
}
static void
@ -727,40 +770,48 @@ btree_desc(StringInfo buf, uint8 xl_info, char *rec)
{
xl_btree_split * xlrec = ( xl_btree_split * ) rec ;
appendStringInfo ( buf , " split_l: " ) ;
out_target ( buf , & ( xlrec - > target ) ) ;
appendStringInfo ( buf , " ; oth %u; rgh %u " ,
xlrec - > otherblk , xlrec - > rightblk ) ;
appendStringInfo ( buf , " split_l: rel %u/%u/%u " ,
xlrec - > node . spcNode , xlrec - > node . dbNode ,
xlrec - > node . relNode ) ;
appendStringInfo ( buf , " left %u, right %u off %u level %u " ,
xlrec - > leftsib , xlrec - > rightsib ,
xlrec - > firstright , xlrec - > level ) ;
break ;
}
case XLOG_BTREE_SPLIT_R :
{
xl_btree_split * xlrec = ( xl_btree_split * ) rec ;
appendStringInfo ( buf , " split_r: " ) ;
out_target ( buf , & ( xlrec - > target ) ) ;
appendStringInfo ( buf , " ; oth %u; rgh %u " ,
xlrec - > otherblk , xlrec - > rightblk ) ;
appendStringInfo ( buf , " split_r: rel %u/%u/%u " ,
xlrec - > node . spcNode , xlrec - > node . dbNode ,
xlrec - > node . relNode ) ;
appendStringInfo ( buf , " left %u, right %u off %u level %u " ,
xlrec - > leftsib , xlrec - > rightsib ,
xlrec - > firstright , xlrec - > level ) ;
break ;
}
case XLOG_BTREE_SPLIT_L_ROOT :
{
xl_btree_split * xlrec = ( xl_btree_split * ) rec ;
appendStringInfo ( buf , " split_l_root: " ) ;
out_target ( buf , & ( xlrec - > target ) ) ;
appendStringInfo ( buf , " ; oth %u; rgh %u " ,
xlrec - > otherblk , xlrec - > rightblk ) ;
appendStringInfo ( buf , " split_l_root: rel %u/%u/%u " ,
xlrec - > node . spcNode , xlrec - > node . dbNode ,
xlrec - > node . relNode ) ;
appendStringInfo ( buf , " left %u, right %u off %u level %u " ,
xlrec - > leftsib , xlrec - > rightsib ,
xlrec - > firstright , xlrec - > level ) ;
break ;
}
case XLOG_BTREE_SPLIT_R_ROOT :
{
xl_btree_split * xlrec = ( xl_btree_split * ) rec ;
appendStringInfo ( buf , " split_r_root: " ) ;
out_target ( buf , & ( xlrec - > target ) ) ;
appendStringInfo ( buf , " ; oth %u; rgh %u " ,
xlrec - > otherblk , xlrec - > rightblk ) ;
appendStringInfo ( buf , " split_r_root: rel %u/%u/%u " ,
xlrec - > node . spcNode , xlrec - > node . dbNode ,
xlrec - > node . relNode ) ;
appendStringInfo ( buf , " left %u, right %u off %u level %u " ,
xlrec - > leftsib , xlrec - > rightsib ,
xlrec - > firstright , xlrec - > level ) ;
break ;
}
case XLOG_BTREE_DELETE :