@ -35,6 +35,7 @@
# include "storage/freespace.h"
# include "utils/acl.h"
# include "utils/builtins.h"
# include "utils/datum.h"
# include "utils/index_selfuncs.h"
# include "utils/memutils.h"
# include "utils/rel.h"
@ -77,7 +78,9 @@ static void form_and_insert_tuple(BrinBuildState *state);
static void union_tuples ( BrinDesc * bdesc , BrinMemTuple * a ,
BrinTuple * b ) ;
static void brin_vacuum_scan ( Relation idxrel , BufferAccessStrategy strategy ) ;
static bool add_values_to_range ( Relation idxRel , BrinDesc * bdesc ,
BrinMemTuple * dtup , Datum * values , bool * nulls ) ;
static bool check_null_keys ( BrinValues * bval , ScanKey * nullkeys , int nnullkeys ) ;
/*
* BRIN handler function : return IndexAmRoutine with access method parameters
@ -179,7 +182,6 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
OffsetNumber off ;
BrinTuple * brtup ;
BrinMemTuple * dtup ;
int keyno ;
CHECK_FOR_INTERRUPTS ( ) ;
@ -243,31 +245,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
dtup = brin_deform_tuple ( bdesc , brtup , NULL ) ;
/*
* Compare the key values of the new tuple to the stored index values ;
* our deformed tuple will get updated if the new tuple doesn ' t fit
* the original range ( note this means we can ' t break out of the loop
* early ) . Make a note of whether this happens , so that we know to
* insert the modified tuple later .
*/
for ( keyno = 0 ; keyno < bdesc - > bd_tupdesc - > natts ; keyno + + )
{
Datum result ;
BrinValues * bval ;
FmgrInfo * addValue ;
bval = & dtup - > bt_columns [ keyno ] ;
addValue = index_getprocinfo ( idxRel , keyno + 1 ,
BRIN_PROCNUM_ADDVALUE ) ;
result = FunctionCall4Coll ( addValue ,
idxRel - > rd_indcollation [ keyno ] ,
PointerGetDatum ( bdesc ) ,
PointerGetDatum ( bval ) ,
values [ keyno ] ,
nulls [ keyno ] ) ;
/* if that returned true, we need to insert the updated tuple */
need_insert | = DatumGetBool ( result ) ;
}
need_insert = add_values_to_range ( idxRel , bdesc , dtup , values , nulls ) ;
if ( ! need_insert )
{
@ -390,8 +368,10 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
BrinMemTuple * dtup ;
BrinTuple * btup = NULL ;
Size btupsz = 0 ;
ScanKey * * keys ;
int * nkeys ;
ScanKey * * keys ,
* * nullkeys ;
int * nkeys ,
* nnullkeys ;
int keyno ;
opaque = ( BrinOpaque * ) scan - > opaque ;
@ -420,13 +400,18 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
* keys , so we allocate space for all attributes . That may use more memory
* but it ' s probably cheaper than determining which attributes are used .
*
* We keep null and regular keys separate , so that we can pass just the
* regular keys to the consistent function easily .
*
* XXX The widest index can have 32 attributes , so the amount of wasted
* memory is negligible . We could invent a more compact approach ( with
* just space for used attributes ) but that would make the matching more
* complex so it ' s not a good trade - off .
*/
keys = palloc0 ( sizeof ( ScanKey * ) * bdesc - > bd_tupdesc - > natts ) ;
nullkeys = palloc0 ( sizeof ( ScanKey * ) * bdesc - > bd_tupdesc - > natts ) ;
nkeys = palloc0 ( sizeof ( int ) * bdesc - > bd_tupdesc - > natts ) ;
nnullkeys = palloc0 ( sizeof ( int ) * bdesc - > bd_tupdesc - > natts ) ;
/* Preprocess the scan keys - split them into per-attribute arrays. */
for ( keyno = 0 ; keyno < scan - > numberOfKeys ; keyno + + )
@ -444,23 +429,23 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
TupleDescAttr ( bdesc - > bd_tupdesc ,
keyattno - 1 ) - > attcollation ) ) ;
/* First time we see this attribute, so init the array of keys. */
if ( ! keys [ keyattno - 1 ] )
/*
* First time we see this index attribute , so init as needed .
*
* This is a bit of an overkill - we don ' t know how many scan keys are
* there for this attribute , so we simply allocate the largest number
* possible ( as if all keys were for this attribute ) . This may waste a
* bit of memory , but we only expect small number of scan keys in
* general , so this should be negligible , and repeated repalloc calls
* are not free either .
*/
if ( consistentFn [ keyattno - 1 ] . fn_oid = = InvalidOid )
{
FmgrInfo * tmp ;
/*
* This is a bit of an overkill - we don ' t know how many scan keys
* are there for this attribute , so we simply allocate the largest
* number possible ( as if all keys were for this attribute ) . This
* may waste a bit of memory , but we only expect small number of
* scan keys in general , so this should be negligible , and
* repeated repalloc calls are not free either .
*/
keys [ keyattno - 1 ] = palloc0 ( sizeof ( ScanKey ) * scan - > numberOfKeys ) ;
/* First time this column, so look up consistent function */
Assert ( consistentFn [ keyattno - 1 ] . fn_oid = = InvalidOid ) ;
/* No key/null arrays for this attribute. */
Assert ( ( keys [ keyattno - 1 ] = = NULL ) & & ( nkeys [ keyattno - 1 ] = = 0 ) ) ;
Assert ( ( nullkeys [ keyattno - 1 ] = = NULL ) & & ( nnullkeys [ keyattno - 1 ] = = 0 ) ) ;
tmp = index_getprocinfo ( idxRel , keyattno ,
BRIN_PROCNUM_CONSISTENT ) ;
@ -468,9 +453,23 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
CurrentMemoryContext ) ;
}
/* Add key to the per-attribute array. */
keys [ keyattno - 1 ] [ nkeys [ keyattno - 1 ] ] = key ;
nkeys [ keyattno - 1 ] + + ;
/* Add key to the proper per-attribute array. */
if ( key - > sk_flags & SK_ISNULL )
{
if ( ! nullkeys [ keyattno - 1 ] )
nullkeys [ keyattno - 1 ] = palloc0 ( sizeof ( ScanKey ) * scan - > numberOfKeys ) ;
nullkeys [ keyattno - 1 ] [ nnullkeys [ keyattno - 1 ] ] = key ;
nnullkeys [ keyattno - 1 ] + + ;
}
else
{
if ( ! keys [ keyattno - 1 ] )
keys [ keyattno - 1 ] = palloc0 ( sizeof ( ScanKey ) * scan - > numberOfKeys ) ;
keys [ keyattno - 1 ] [ nkeys [ keyattno - 1 ] ] = key ;
nkeys [ keyattno - 1 ] + + ;
}
}
/* allocate an initial in-memory tuple, out of the per-range memcxt */
@ -549,15 +548,58 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
Datum add ;
Oid collation ;
/* skip attributes without any scan keys */
if ( nkeys [ attno - 1 ] = = 0 )
/*
* skip attributes without any scan keys ( both regular and
* IS [ NOT ] NULL )
*/
if ( nkeys [ attno - 1 ] = = 0 & & nnullkeys [ attno - 1 ] = = 0 )
continue ;
bval = & dtup - > bt_columns [ attno - 1 ] ;
/*
* First check if there are any IS [ NOT ] NULL scan keys ,
* and if we ' re violating them . In that case we can
* terminate early , without invoking the support function .
*
* As there may be more keys , we can only detemine
* mismatch within this loop .
*/
if ( bdesc - > bd_info [ attno - 1 ] - > oi_regular_nulls & &
! check_null_keys ( bval , nullkeys [ attno - 1 ] ,
nnullkeys [ attno - 1 ] ) )
{
/*
* If any of the IS [ NOT ] NULL keys failed , the page
* range as a whole can ' t pass . So terminate the loop .
*/
addrange = false ;
break ;
}
/*
* So either there are no IS [ NOT ] NULL keys , or all
* passed . If there are no regular scan keys , we ' re done -
* the page range matches . If there are regular keys , but
* the page range is marked as ' all nulls ' it can ' t
* possibly pass ( we ' re assuming the operators are
* strict ) .
*/
/* No regular scan keys - page range as a whole passes. */
if ( ! nkeys [ attno - 1 ] )
continue ;
Assert ( ( nkeys [ attno - 1 ] > 0 ) & &
( nkeys [ attno - 1 ] < = scan - > numberOfKeys ) ) ;
/* If it is all nulls, it cannot possibly be consistent. */
if ( bval - > bv_allnulls )
{
addrange = false ;
break ;
}
/*
* Check whether the scan key is consistent with the page
* range values ; if so , have the pages in the range added
@ -663,7 +705,6 @@ brinbuildCallback(Relation index,
{
BrinBuildState * state = ( BrinBuildState * ) brstate ;
BlockNumber thisblock ;
int i ;
thisblock = ItemPointerGetBlockNumber ( tid ) ;
@ -692,25 +733,8 @@ brinbuildCallback(Relation index,
}
/* Accumulate the current tuple into the running state */
for ( i = 0 ; i < state - > bs_bdesc - > bd_tupdesc - > natts ; i + + )
{
FmgrInfo * addValue ;
BrinValues * col ;
Form_pg_attribute attr = TupleDescAttr ( state - > bs_bdesc - > bd_tupdesc , i ) ;
col = & state - > bs_dtuple - > bt_columns [ i ] ;
addValue = index_getprocinfo ( index , i + 1 ,
BRIN_PROCNUM_ADDVALUE ) ;
/*
* Update dtuple state , if and as necessary .
*/
FunctionCall4Coll ( addValue ,
attr - > attcollation ,
PointerGetDatum ( state - > bs_bdesc ) ,
PointerGetDatum ( col ) ,
values [ i ] , isnull [ i ] ) ;
}
( void ) add_values_to_range ( index , state - > bs_bdesc , state - > bs_dtuple ,
values , isnull ) ;
}
/*
@ -1489,6 +1513,39 @@ union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
FmgrInfo * unionFn ;
BrinValues * col_a = & a - > bt_columns [ keyno ] ;
BrinValues * col_b = & db - > bt_columns [ keyno ] ;
BrinOpcInfo * opcinfo = bdesc - > bd_info [ keyno ] ;
if ( opcinfo - > oi_regular_nulls )
{
/* Adjust "hasnulls". */
if ( ! col_a - > bv_hasnulls & & col_b - > bv_hasnulls )
col_a - > bv_hasnulls = true ;
/* If there are no values in B, there's nothing left to do. */
if ( col_b - > bv_allnulls )
continue ;
/*
* Adjust " allnulls " . If A doesn ' t have values , just copy the
* values from B into A , and we ' re done . We cannot run the
* operators in this case , because values in A might contain
* garbage . Note we already established that B contains values .
*/
if ( col_a - > bv_allnulls )
{
int i ;
col_a - > bv_allnulls = false ;
for ( i = 0 ; i < opcinfo - > oi_nstored ; i + + )
col_a - > bv_values [ i ] =
datumCopy ( col_b - > bv_values [ i ] ,
opcinfo - > oi_typcache [ i ] - > typbyval ,
opcinfo - > oi_typcache [ i ] - > typlen ) ;
continue ;
}
}
unionFn = index_getprocinfo ( bdesc - > bd_index , keyno + 1 ,
BRIN_PROCNUM_UNION ) ;
@ -1542,3 +1599,103 @@ brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy)
*/
FreeSpaceMapVacuum ( idxrel ) ;
}
static bool
add_values_to_range ( Relation idxRel , BrinDesc * bdesc , BrinMemTuple * dtup ,
Datum * values , bool * nulls )
{
int keyno ;
bool modified = false ;
/*
* Compare the key values of the new tuple to the stored index values ; our
* deformed tuple will get updated if the new tuple doesn ' t fit the
* original range ( note this means we can ' t break out of the loop early ) .
* Make a note of whether this happens , so that we know to insert the
* modified tuple later .
*/
for ( keyno = 0 ; keyno < bdesc - > bd_tupdesc - > natts ; keyno + + )
{
Datum result ;
BrinValues * bval ;
FmgrInfo * addValue ;
bval = & dtup - > bt_columns [ keyno ] ;
if ( bdesc - > bd_info [ keyno ] - > oi_regular_nulls & & nulls [ keyno ] )
{
/*
* If the new value is null , we record that we saw it if it ' s the
* first one ; otherwise , there ' s nothing to do .
*/
if ( ! bval - > bv_hasnulls )
{
bval - > bv_hasnulls = true ;
modified = true ;
}
continue ;
}
addValue = index_getprocinfo ( idxRel , keyno + 1 ,
BRIN_PROCNUM_ADDVALUE ) ;
result = FunctionCall4Coll ( addValue ,
idxRel - > rd_indcollation [ keyno ] ,
PointerGetDatum ( bdesc ) ,
PointerGetDatum ( bval ) ,
values [ keyno ] ,
nulls [ keyno ] ) ;
/* if that returned true, we need to insert the updated tuple */
modified | = DatumGetBool ( result ) ;
}
return modified ;
}
static bool
check_null_keys ( BrinValues * bval , ScanKey * nullkeys , int nnullkeys )
{
int keyno ;
/*
* First check if there are any IS [ NOT ] NULL scan keys , and if we ' re
* violating them .
*/
for ( keyno = 0 ; keyno < nnullkeys ; keyno + + )
{
ScanKey key = nullkeys [ keyno ] ;
Assert ( key - > sk_attno = = bval - > bv_attno ) ;
/* Handle only IS NULL/IS NOT NULL tests */
if ( ! ( key - > sk_flags & SK_ISNULL ) )
continue ;
if ( key - > sk_flags & SK_SEARCHNULL )
{
/* IS NULL scan key, but range has no NULLs */
if ( ! bval - > bv_allnulls & & ! bval - > bv_hasnulls )
return false ;
}
else if ( key - > sk_flags & SK_SEARCHNOTNULL )
{
/*
* For IS NOT NULL , we can only skip ranges that are known to have
* only nulls .
*/
if ( bval - > bv_allnulls )
return false ;
}
else
{
/*
* Neither IS NULL nor IS NOT NULL was used ; assume all indexable
* operators are strict and thus return false with NULL value in
* the scan key .
*/
return false ;
}
}
return true ;
}