Use the sortsupport infrastructure in more cases.

This removes some fmgr overhead from cases such as btree index builds.

Peter Geoghegan, reviewed by Andreas Karlsson and me.
pull/14/head
Robert Haas 11 years ago
parent 99e8f08fab
commit 5ea86e6e65
  1. 63
      src/backend/access/nbtree/nbtsort.c
  2. 79
      src/backend/utils/sort/sortsupport.c
  3. 275
      src/backend/utils/sort/tuplesort.c
  4. 3
      src/include/utils/sortsupport.h

@ -686,6 +686,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
int i,
keysz = RelationGetNumberOfAttributes(wstate->index);
ScanKey indexScanKey = NULL;
SortSupport sortKeys;
if (merge)
{
@ -701,6 +702,31 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
true, &should_free2);
indexScanKey = _bt_mkscankey_nodata(wstate->index);
/* Prepare SortSupport data for each column */
sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
for (i = 0; i < keysz; i++)
{
SortSupport sortKey = sortKeys + i;
ScanKey scanKey = indexScanKey + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
for (;;)
{
load1 = true; /* load BTSpool next ? */
@ -713,43 +739,20 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
{
for (i = 1; i <= keysz; i++)
{
ScanKey entry;
SortSupport entry;
Datum attrDatum1,
attrDatum2;
bool isNull1,
isNull2;
int32 compare;
entry = indexScanKey + i - 1;
entry = sortKeys + i - 1;
attrDatum1 = index_getattr(itup, i, tupdes, &isNull1);
attrDatum2 = index_getattr(itup2, i, tupdes, &isNull2);
if (isNull1)
{
if (isNull2)
compare = 0; /* NULL "=" NULL */
else if (entry->sk_flags & SK_BT_NULLS_FIRST)
compare = -1; /* NULL "<" NOT_NULL */
else
compare = 1; /* NULL ">" NOT_NULL */
}
else if (isNull2)
{
if (entry->sk_flags & SK_BT_NULLS_FIRST)
compare = 1; /* NOT_NULL ">" NULL */
else
compare = -1; /* NOT_NULL "<" NULL */
}
else
{
compare =
DatumGetInt32(FunctionCall2Coll(&entry->sk_func,
entry->sk_collation,
attrDatum1,
attrDatum2));
if (entry->sk_flags & SK_BT_DESC)
compare = -compare;
}
compare = ApplySortComparator(attrDatum1, isNull1,
attrDatum2, isNull2,
entry);
if (compare > 0)
{
load1 = false;
@ -783,7 +786,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
true, &should_free2);
}
}
_bt_freeskey(indexScanKey);
pfree(sortKeys);
}
else
{

@ -21,6 +21,7 @@
#include "access/nbtree.h"
#include "fmgr.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/sortsupport.h"
@ -86,28 +87,14 @@ PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup)
}
/*
* Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
*
* Caller must previously have zeroed the SortSupportData structure and then
* filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill
* in ssup_reverse as well as the comparator function pointer.
* Look up and call sortsupport function to setup SortSupport comparator;
* or if no such function exists or it declines to set up the appropriate
* state, prepare a suitable shim.
*/
void
PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
static void
FinishSortSupportFunction(Oid opfamily, Oid opcintype, SortSupport ssup)
{
Oid sortSupportFunction;
Oid opfamily;
Oid opcintype;
int16 strategy;
Assert(ssup->comparator == NULL);
/* Find the operator in pg_amop */
if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
&strategy))
elog(ERROR, "operator %u is not a valid ordering operator",
orderingOp);
ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
/* Look for a sort support function */
sortSupportFunction = get_opfamily_proc(opfamily, opcintype, opcintype,
@ -136,3 +123,57 @@ PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
PrepareSortSupportComparisonShim(sortFunction, ssup);
}
}
/*
* Fill in SortSupport given an ordering operator (btree "<" or ">" operator).
*
* Caller must previously have zeroed the SortSupportData structure and then
* filled in ssup_cxt, ssup_collation, and ssup_nulls_first. This will fill
* in ssup_reverse as well as the comparator function pointer.
*/
void
PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup)
{
Oid opfamily;
Oid opcintype;
int16 strategy;
Assert(ssup->comparator == NULL);
/* Find the operator in pg_amop */
if (!get_ordering_op_properties(orderingOp, &opfamily, &opcintype,
&strategy))
elog(ERROR, "operator %u is not a valid ordering operator",
orderingOp);
ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
FinishSortSupportFunction(opfamily, opcintype, ssup);
}
/*
* Fill in SortSupport given an index relation, attribute, and strategy.
*
* Caller must previously have zeroed the SortSupportData structure and then
* filled in ssup_cxt, ssup_attno, ssup_collation, and ssup_nulls_first. This
* will fill in ssup_reverse (based on the supplied strategy), as well as the
* comparator function pointer.
*/
void
PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
SortSupport ssup)
{
Oid opfamily = indexRel->rd_opfamily[ssup->ssup_attno - 1];
Oid opcintype = indexRel->rd_opcintype[ssup->ssup_attno - 1];
Assert(ssup->comparator == NULL);
/* Find the operator in pg_amop */
if (indexRel->rd_rel->relam != BTREE_AM_OID)
elog(ERROR, "unexpected non-btree AM: %u", indexRel->rd_rel->relam);
if (strategy != BTGreaterStrategyNumber &&
strategy != BTLessStrategyNumber)
elog(ERROR, "unexpected sort support strategy: %d", strategy);
ssup->ssup_reverse = (strategy == BTGreaterStrategyNumber);
FinishSortSupportFunction(opfamily, opcintype, ssup);
}

@ -256,13 +256,6 @@ struct Tuplesortstate
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
/*
* Function to reverse the sort direction from its current state. (We
* could dispense with this if we wanted to enforce that all variants
* represent the sort key information alike.)
*/
void (*reversedirection) (Tuplesortstate *state);
/*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
@ -340,8 +333,9 @@ struct Tuplesortstate
bool markpos_eof; /* saved "eof_reached" */
/*
* These variables are specific to the MinimalTuple case; they are set by
* tuplesort_begin_heap and used only by the MinimalTuple routines.
* The sortKeys variable is used by every case other than the hash index
* case; it is set by tuplesort_begin_xxx. tupDesc is only used by the
* MinimalTuple and CLUSTER routines, though.
*/
TupleDesc tupDesc;
SortSupport sortKeys; /* array of length nKeys */
@ -354,8 +348,7 @@ struct Tuplesortstate
/*
* These variables are specific to the CLUSTER case; they are set by
* tuplesort_begin_cluster. Note CLUSTER also uses tupDesc and
* indexScanKey.
* tuplesort_begin_cluster.
*/
IndexInfo *indexInfo; /* info about index being used for reference */
EState *estate; /* for evaluating index expressions */
@ -368,7 +361,6 @@ struct Tuplesortstate
Relation indexRel; /* index being built */
/* These are specific to the index_btree subcase: */
ScanKey indexScanKey;
bool enforceUnique; /* complain if we find duplicate tuples */
/* These are specific to the index_hash subcase: */
@ -395,7 +387,6 @@ struct Tuplesortstate
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
#define REVERSEDIRECTION(state) ((*(state)->reversedirection) (state))
#define LACKMEM(state) ((state)->availMem < 0)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
@ -464,6 +455,7 @@ static void sort_bounded_heap(Tuplesortstate *state);
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex);
static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
@ -473,7 +465,6 @@ static void writetup_heap(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void reversedirection_heap(Tuplesortstate *state);
static int comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
@ -490,8 +481,6 @@ static void writetup_index(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void reversedirection_index_btree(Tuplesortstate *state);
static void reversedirection_index_hash(Tuplesortstate *state);
static int comparetup_datum(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
@ -499,7 +488,6 @@ static void writetup_datum(Tuplesortstate *state, int tapenum,
SortTuple *stup);
static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len);
static void reversedirection_datum(Tuplesortstate *state);
static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
/*
@ -629,7 +617,6 @@ tuplesort_begin_heap(TupleDesc tupDesc,
state->copytup = copytup_heap;
state->writetup = writetup_heap;
state->readtup = readtup_heap;
state->reversedirection = reversedirection_heap;
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
@ -665,7 +652,9 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
int workMem, bool randomAccess)
{
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext;
int i;
Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
@ -691,13 +680,13 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
state->copytup = copytup_cluster;
state->writetup = writetup_cluster;
state->readtup = readtup_cluster;
state->reversedirection = reversedirection_index_btree;
state->indexInfo = BuildIndexInfo(indexRel);
state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
indexScanKey = _bt_mkscankey_nodata(indexRel);
if (state->indexInfo->ii_Expressions != NULL)
{
TupleTableSlot *slot;
@ -715,6 +704,32 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
econtext->ecxt_scantuple = slot;
}
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
sizeof(SortSupportData));
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
ScanKey scanKey = indexScanKey + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
MemoryContextSwitchTo(oldcontext);
return state;
@ -727,7 +742,9 @@ tuplesort_begin_index_btree(Relation heapRel,
int workMem, bool randomAccess)
{
Tuplesortstate *state = tuplesort_begin_common(workMem, randomAccess);
ScanKey indexScanKey;
MemoryContext oldcontext;
int i;
oldcontext = MemoryContextSwitchTo(state->sortcontext);
@ -751,13 +768,40 @@ tuplesort_begin_index_btree(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
state->reversedirection = reversedirection_index_btree;
state->heapRel = heapRel;
state->indexRel = indexRel;
state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->enforceUnique = enforceUnique;
indexScanKey = _bt_mkscankey_nodata(indexRel);
state->nKeys = RelationGetNumberOfAttributes(indexRel);
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
sizeof(SortSupportData));
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
ScanKey scanKey = indexScanKey + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
sortKey->ssup_collation = scanKey->sk_collation;
sortKey->ssup_nulls_first =
(scanKey->sk_flags & SK_BT_NULLS_FIRST) != 0;
sortKey->ssup_attno = scanKey->sk_attno;
AssertState(sortKey->ssup_attno != 0);
strategy = (scanKey->sk_flags & SK_BT_DESC) != 0 ?
BTGreaterStrategyNumber : BTLessStrategyNumber;
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
MemoryContextSwitchTo(oldcontext);
return state;
@ -788,7 +832,6 @@ tuplesort_begin_index_hash(Relation heapRel,
state->copytup = copytup_index;
state->writetup = writetup_index;
state->readtup = readtup_index;
state->reversedirection = reversedirection_index_hash;
state->heapRel = heapRel;
state->indexRel = indexRel;
@ -831,7 +874,6 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
state->copytup = copytup_datum;
state->writetup = writetup_datum;
state->readtup = readtup_datum;
state->reversedirection = reversedirection_datum;
state->datumType = datumType;
@ -2599,7 +2641,7 @@ make_bounded_heap(Tuplesortstate *state)
Assert(tupcount >= state->bound);
/* Reverse sort direction so largest entry will be at root */
REVERSEDIRECTION(state);
reversedirection(state);
state->memtupcount = 0; /* make the heap empty */
for (i = 0; i < tupcount; i++)
@ -2663,7 +2705,7 @@ sort_bounded_heap(Tuplesortstate *state)
* Reverse sort direction back to the original state. This is not
* actually necessary but seems like a good idea for tidiness.
*/
REVERSEDIRECTION(state);
reversedirection(state);
state->status = TSS_SORTEDINMEM;
state->boundUsed = true;
@ -2753,6 +2795,24 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
memtuples[i] = *tuple;
}
/*
* Function to reverse the sort direction from its current state
*
* It is not safe to call this when performing hash tuplesorts
*/
static void
reversedirection(Tuplesortstate *state)
{
SortSupport sortKey = state->sortKeys;
int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
{
sortKey->ssup_reverse = !sortKey->ssup_reverse;
sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
}
}
/*
* Tape interface routines
@ -2780,73 +2840,6 @@ markrunend(Tuplesortstate *state, int tapenum)
}
/*
* Inline-able copy of FunctionCall2Coll() to save some cycles in sorting.
*/
static inline Datum
myFunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
{
FunctionCallInfoData fcinfo;
Datum result;
InitFunctionCallInfoData(fcinfo, flinfo, 2, collation, NULL, NULL);
fcinfo.arg[0] = arg1;
fcinfo.arg[1] = arg2;
fcinfo.argnull[0] = false;
fcinfo.argnull[1] = false;
result = FunctionCallInvoke(&fcinfo);
/* Check for null result, since caller is clearly not expecting one */
if (fcinfo.isnull)
elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
return result;
}
/*
* Apply a sort function (by now converted to fmgr lookup form)
* and return a 3-way comparison result. This takes care of handling
* reverse-sort and NULLs-ordering properly. We assume that DESC and
* NULLS_FIRST options are encoded in sk_flags the same way btree does it.
*/
static inline int32
inlineApplySortFunction(FmgrInfo *sortFunction, int sk_flags, Oid collation,
Datum datum1, bool isNull1,
Datum datum2, bool isNull2)
{
int32 compare;
if (isNull1)
{
if (isNull2)
compare = 0; /* NULL "=" NULL */
else if (sk_flags & SK_BT_NULLS_FIRST)
compare = -1; /* NULL "<" NOT_NULL */
else
compare = 1; /* NULL ">" NOT_NULL */
}
else if (isNull2)
{
if (sk_flags & SK_BT_NULLS_FIRST)
compare = 1; /* NOT_NULL ">" NULL */
else
compare = -1; /* NOT_NULL "<" NULL */
}
else
{
compare = DatumGetInt32(myFunctionCall2Coll(sortFunction, collation,
datum1, datum2));
if (sk_flags & SK_BT_DESC)
compare = -compare;
}
return compare;
}
/*
* Routines specialized for HeapTuple (actually MinimalTuple) case
*/
@ -2972,20 +2965,6 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
static void
reversedirection_heap(Tuplesortstate *state)
{
SortSupport sortKey = state->sortKeys;
int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++, sortKey++)
{
sortKey->ssup_reverse = !sortKey->ssup_reverse;
sortKey->ssup_nulls_first = !sortKey->ssup_nulls_first;
}
}
/*
* Routines specialized for the CLUSTER case (HeapTuple data, with
* comparisons per a btree index definition)
@ -2995,7 +2974,7 @@ static int
comparetup_cluster(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state)
{
ScanKey scanKey = state->indexScanKey;
SortSupport sortKey = state->sortKeys;
HeapTuple ltup;
HeapTuple rtup;
TupleDesc tupDesc;
@ -3005,14 +2984,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
/* Compare the leading sort key, if it's simple */
if (state->indexInfo->ii_KeyAttrNumbers[0] != 0)
{
compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
scanKey->sk_collation,
a->datum1, a->isnull1,
b->datum1, b->isnull1);
compare = ApplySortComparator(a->datum1, a->isnull1,
b->datum1, b->isnull1,
sortKey);
if (compare != 0 || state->nKeys == 1)
return compare;
/* Compare additional columns the hard way */
scanKey++;
sortKey++;
nkey = 1;
}
else
@ -3030,7 +3008,7 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
/* If not expression index, just compare the proper heap attrs */
tupDesc = state->tupDesc;
for (; nkey < state->nKeys; nkey++, scanKey++)
for (; nkey < state->nKeys; nkey++, sortKey++)
{
AttrNumber attno = state->indexInfo->ii_KeyAttrNumbers[nkey];
Datum datum1,
@ -3041,11 +3019,9 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
compare = inlineApplySortFunction(&scanKey->sk_func,
scanKey->sk_flags,
scanKey->sk_collation,
datum1, isnull1,
datum2, isnull2);
compare = ApplySortComparator(datum1, isnull1,
datum2, isnull2,
sortKey);
if (compare != 0)
return compare;
}
@ -3077,15 +3053,13 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b,
FormIndexDatum(state->indexInfo, ecxt_scantuple, state->estate,
r_index_values, r_index_isnull);
for (; nkey < state->nKeys; nkey++, scanKey++)
for (; nkey < state->nKeys; nkey++, sortKey++)
{
compare = inlineApplySortFunction(&scanKey->sk_func,
scanKey->sk_flags,
scanKey->sk_collation,
l_index_values[nkey],
l_index_isnull[nkey],
r_index_values[nkey],
r_index_isnull[nkey]);
compare = ApplySortComparator(l_index_values[nkey],
l_index_isnull[nkey],
r_index_values[nkey],
r_index_isnull[nkey],
sortKey);
if (compare != 0)
return compare;
}
@ -3180,7 +3154,7 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
* is also special handling for enforcing uniqueness, and special treatment
* for equal keys at the end.
*/
ScanKey scanKey = state->indexScanKey;
SortSupport sortKey = state->sortKeys;
IndexTuple tuple1;
IndexTuple tuple2;
int keysz;
@ -3190,10 +3164,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
int32 compare;
/* Compare the leading sort key */
compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
scanKey->sk_collation,
a->datum1, a->isnull1,
b->datum1, b->isnull1);
compare = ApplySortComparator(a->datum1, a->isnull1,
b->datum1, b->isnull1,
sortKey);
if (compare != 0)
return compare;
@ -3206,8 +3179,8 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
tuple2 = (IndexTuple) b->tuple;
keysz = state->nKeys;
tupDes = RelationGetDescr(state->indexRel);
scanKey++;
for (nkey = 2; nkey <= keysz; nkey++, scanKey++)
sortKey++;
for (nkey = 2; nkey <= keysz; nkey++, sortKey++)
{
Datum datum1,
datum2;
@ -3217,10 +3190,9 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b,
datum1 = index_getattr(tuple1, nkey, tupDes, &isnull1);
datum2 = index_getattr(tuple2, nkey, tupDes, &isnull2);
compare = inlineApplySortFunction(&scanKey->sk_func, scanKey->sk_flags,
scanKey->sk_collation,
datum1, isnull1,
datum2, isnull2);
compare = ApplySortComparator(datum1, isnull1,
datum2, isnull2,
sortKey);
if (compare != 0)
return compare; /* done when we find unequal attributes */
@ -3394,26 +3366,6 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
&stup->isnull1);
}
static void
reversedirection_index_btree(Tuplesortstate *state)
{
ScanKey scanKey = state->indexScanKey;
int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++, scanKey++)
{
scanKey->sk_flags ^= (SK_BT_DESC | SK_BT_NULLS_FIRST);
}
}
static void
reversedirection_index_hash(Tuplesortstate *state)
{
/* We don't support reversing direction in a hash index sort */
elog(ERROR, "reversedirection_index_hash is not implemented");
}
/*
* Routines specialized for DatumTuple case
*/
@ -3512,13 +3464,6 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
&tuplen, sizeof(tuplen));
}
static void
reversedirection_datum(Tuplesortstate *state)
{
state->onlyKey->ssup_reverse = !state->onlyKey->ssup_reverse;
state->onlyKey->ssup_nulls_first = !state->onlyKey->ssup_nulls_first;
}
/*
* Convenience routine to free a tuple previously loaded into sort memory
*/

@ -48,6 +48,7 @@
#define SORTSUPPORT_H
#include "access/attnum.h"
#include "utils/relcache.h"
typedef struct SortSupportData *SortSupport;
@ -152,5 +153,7 @@ ApplySortComparator(Datum datum1, bool isNull1,
/* Other functions in utils/sort/sortsupport.c */
extern void PrepareSortSupportComparisonShim(Oid cmpFunc, SortSupport ssup);
extern void PrepareSortSupportFromOrderingOp(Oid orderingOp, SortSupport ssup);
extern void PrepareSortSupportFromIndexRel(Relation indexRel, int16 strategy,
SortSupport ssup);
#endif /* SORTSUPPORT_H */

Loading…
Cancel
Save