@ -75,6 +75,8 @@ typedef struct BtreeCheckState
bool readonly ;
/* Also verifying heap has no unindexed tuples? */
bool heapallindexed ;
/* Also making sure non-pivot tuples can be found by new search? */
bool rootdescend ;
/* Per-page context */
MemoryContext targetcontext ;
/* Buffer access strategy */
@ -124,10 +126,11 @@ PG_FUNCTION_INFO_V1(bt_index_check);
PG_FUNCTION_INFO_V1 ( bt_index_parent_check ) ;
static void bt_index_check_internal ( Oid indrelid , bool parentcheck ,
bool heapallindexed ) ;
bool heapallindexed , bool rootdescend ) ;
static inline void btree_index_checkable ( Relation rel ) ;
static void bt_check_every_level ( Relation rel , Relation heaprel ,
bool heapkeyspace , bool readonly , bool heapallindexed ) ;
bool heapkeyspace , bool readonly , bool heapallindexed ,
bool rootdescend ) ;
static BtreeLevel bt_check_level_from_leftmost ( BtreeCheckState * state ,
BtreeLevel level ) ;
static void bt_target_page_check ( BtreeCheckState * state ) ;
@ -140,6 +143,7 @@ static void bt_tuple_present_callback(Relation index, HeapTuple htup,
bool tupleIsAlive , void * checkstate ) ;
static IndexTuple bt_normalize_tuple ( BtreeCheckState * state ,
IndexTuple itup ) ;
static bool bt_rootdescend ( BtreeCheckState * state , IndexTuple itup ) ;
static inline bool offset_is_negative_infinity ( BTPageOpaque opaque ,
OffsetNumber offset ) ;
static inline bool invariant_l_offset ( BtreeCheckState * state , BTScanInsert key ,
@ -177,7 +181,7 @@ bt_index_check(PG_FUNCTION_ARGS)
if ( PG_NARGS ( ) = = 2 )
heapallindexed = PG_GETARG_BOOL ( 1 ) ;
bt_index_check_internal ( indrelid , false , heapallindexed ) ;
bt_index_check_internal ( indrelid , false , heapallindexed , false ) ;
PG_RETURN_VOID ( ) ;
}
@ -196,11 +200,14 @@ bt_index_parent_check(PG_FUNCTION_ARGS)
{
Oid indrelid = PG_GETARG_OID ( 0 ) ;
bool heapallindexed = false ;
bool rootdescend = false ;
if ( PG_NARGS ( ) = = 2 )
if ( PG_NARGS ( ) > = 2 )
heapallindexed = PG_GETARG_BOOL ( 1 ) ;
if ( PG_NARGS ( ) = = 3 )
rootdescend = PG_GETARG_BOOL ( 2 ) ;
bt_index_check_internal ( indrelid , true , heapallindexed ) ;
bt_index_check_internal ( indrelid , true , heapallindexed , rootdescend ) ;
PG_RETURN_VOID ( ) ;
}
@ -209,7 +216,8 @@ bt_index_parent_check(PG_FUNCTION_ARGS)
* Helper for bt_index_ [ parent_ ] check , coordinating the bulk of the work .
*/
static void
bt_index_check_internal ( Oid indrelid , bool parentcheck , bool heapallindexed )
bt_index_check_internal ( Oid indrelid , bool parentcheck , bool heapallindexed ,
bool rootdescend )
{
Oid heapid ;
Relation indrel ;
@ -267,7 +275,7 @@ bt_index_check_internal(Oid indrelid, bool parentcheck, bool heapallindexed)
/* Check index, possibly against table it is an index on */
heapkeyspace = _bt_heapkeyspace ( indrel ) ;
bt_check_every_level ( indrel , heaprel , heapkeyspace , parentcheck ,
heapallindexed ) ;
heapallindexed , rootdescend ) ;
/*
* Release locks early . That ' s ok here because nothing in the called
@ -338,7 +346,7 @@ btree_index_checkable(Relation rel)
*/
static void
bt_check_every_level ( Relation rel , Relation heaprel , bool heapkeyspace ,
bool readonly , bool heapallindexed )
bool readonly , bool heapallindexed , bool rootdescend )
{
BtreeCheckState * state ;
Page metapage ;
@ -362,6 +370,7 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
state - > heapkeyspace = heapkeyspace ;
state - > readonly = readonly ;
state - > heapallindexed = heapallindexed ;
state - > rootdescend = rootdescend ;
if ( state - > heapallindexed )
{
@ -430,6 +439,14 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
}
}
Assert ( ! state - > rootdescend | | state - > readonly ) ;
if ( state - > rootdescend & & ! state - > heapkeyspace )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " cannot verify that tuples from index \" %s \" can each be found by an independent index search " ,
RelationGetRelationName ( rel ) ) ,
errhint ( " Only B-Tree version 4 indexes support rootdescend verification. " ) ) ) ;
/* Create context for page */
state - > targetcontext = AllocSetContextCreate ( CurrentMemoryContext ,
" amcheck context " ,
@ -922,6 +939,31 @@ bt_target_page_check(BtreeCheckState *state)
if ( offset_is_negative_infinity ( topaque , offset ) )
continue ;
/*
* Readonly callers may optionally verify that non - pivot tuples can
* each be found by an independent search that starts from the root
*/
if ( state - > rootdescend & & P_ISLEAF ( topaque ) & &
! bt_rootdescend ( state , itup ) )
{
char * itid ,
* htid ;
itid = psprintf ( " (%u,%u) " , state - > targetblock , offset ) ;
htid = psprintf ( " (%u,%u) " ,
ItemPointerGetBlockNumber ( & ( itup - > t_tid ) ) ,
ItemPointerGetOffsetNumber ( & ( itup - > t_tid ) ) ) ;
ereport ( ERROR ,
( errcode ( ERRCODE_INDEX_CORRUPTED ) ,
errmsg ( " could not find tuple using search from root page in index \" %s \" " ,
RelationGetRelationName ( state - > rel ) ) ,
errdetail_internal ( " Index tid=%s points to heap tid=%s page lsn=%X/%X. " ,
itid , htid ,
( uint32 ) ( state - > targetlsn > > 32 ) ,
( uint32 ) state - > targetlsn ) ) ) ;
}
/* Build insertion scankey for current page offset */
skey = bt_mkscankey_pivotsearch ( state - > rel , itup ) ;
@ -1526,6 +1568,9 @@ bt_downlink_check(BtreeCheckState *state, BTScanInsert targetkey,
* internal pages . In more general terms , a negative infinity item is
* only negative infinity with respect to the subtree that the page is
* at the root of .
*
* See also : bt_rootdescend ( ) , which can even detect transitive
* inconsistencies on cousin leaf pages .
*/
if ( offset_is_negative_infinity ( copaque , offset ) )
continue ;
@ -1926,6 +1971,81 @@ bt_normalize_tuple(BtreeCheckState *state, IndexTuple itup)
return reformed ;
}
/*
* Search for itup in index , starting from fast root page . itup must be a
* non - pivot tuple . This is only supported with heapkeyspace indexes , since
* we rely on having fully unique keys to find a match with only a signle
* visit to a leaf page , barring an interrupted page split , where we may have
* to move right . ( A concurrent page split is impossible because caller must
* be readonly caller . )
*
* This routine can detect very subtle transitive consistency issues across
* more than one level of the tree . Leaf pages all have a high key ( even the
* rightmost page has a conceptual positive infinity high key ) , but not a low
* key . Their downlink in parent is a lower bound , which along with the high
* key is almost enough to detect every possible inconsistency . A downlink
* separator key value won ' t always be available from parent , though , because
* the first items of internal pages are negative infinity items , truncated
* down to zero attributes during internal page splits . While it ' s true that
* bt_downlink_check ( ) and the high key check can detect most imaginable key
* space problems , there are remaining problems it won ' t detect with non - pivot
* tuples in cousin leaf pages . Starting a search from the root for every
* existing leaf tuple detects small inconsistencies in upper levels of the
* tree that cannot be detected any other way . ( Besides all this , this is
* probably also useful as a direct test of the code used by index scans
* themselves . )
*/
static bool
bt_rootdescend ( BtreeCheckState * state , IndexTuple itup )
{
BTScanInsert key ;
BTStack stack ;
Buffer lbuf ;
bool exists ;
key = _bt_mkscankey ( state - > rel , itup ) ;
Assert ( key - > heapkeyspace & & key - > scantid ! = NULL ) ;
/*
* Search from root .
*
* Ideally , we would arrange to only move right within _bt_search ( ) when
* an interrupted page split is detected ( i . e . when the incomplete split
* bit is found to be set ) , but for now we accept the possibility that
* that could conceal an inconsistency .
*/
Assert ( state - > readonly & & state - > rootdescend ) ;
exists = false ;
stack = _bt_search ( state - > rel , key , & lbuf , BT_READ , NULL ) ;
if ( BufferIsValid ( lbuf ) )
{
BTInsertStateData insertstate ;
OffsetNumber offnum ;
Page page ;
insertstate . itup = itup ;
insertstate . itemsz = MAXALIGN ( IndexTupleSize ( itup ) ) ;
insertstate . itup_key = key ;
insertstate . bounds_valid = false ;
insertstate . buf = lbuf ;
/* Get matching tuple on leaf page */
offnum = _bt_binsrch_insert ( state - > rel , & insertstate ) ;
/* Compare first >= matching item on leaf page, if any */
page = BufferGetPage ( lbuf ) ;
if ( offnum < = PageGetMaxOffsetNumber ( page ) & &
_bt_compare ( state - > rel , key , page , offnum ) = = 0 )
exists = true ;
_bt_relbuf ( state - > rel , lbuf ) ;
}
_bt_freestack ( stack ) ;
pfree ( key ) ;
return exists ;
}
/*
* Is particular offset within page ( whose special state is passed by caller )
* the page negative - infinity item ?