@ -366,6 +366,9 @@ struct Tuplesortstate
char * slabMemoryEnd ; /* end of slab memory arena */
SlabSlot * slabFreeHead ; /* head of free list */
/* Buffer size to use for reading input tapes, during merge. */
size_t read_buffer_size ;
/*
* When we return a tuple to the caller in tuplesort_gettuple_XXX , that
* came from a tape ( that is , in TSS_SORTEDONTAPE or TSS_FINALMERGE
@ -579,7 +582,6 @@ static bool useselection(Tuplesortstate *state);
static void inittapes ( Tuplesortstate * state ) ;
static void selectnewtape ( Tuplesortstate * state ) ;
static void init_slab_allocator ( Tuplesortstate * state , int numSlots ) ;
static void init_tape_buffers ( Tuplesortstate * state , int numInputTapes ) ;
static void mergeruns ( Tuplesortstate * state ) ;
static void mergeonerun ( Tuplesortstate * state ) ;
static void beginmerge ( Tuplesortstate * state ) ;
@ -2056,7 +2058,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* end of the sort anyway , but better to release the
* memory early .
*/
LogicalTapeRewind ( state - > tapeset , srcTape , tru e ) ;
LogicalTapeRewindForWrite ( state - > tapeset , srcTape ) ;
return true ;
}
newtup . tupindex = srcTape ;
@ -2511,72 +2513,6 @@ init_slab_allocator(Tuplesortstate *state, int numSlots)
state - > slabAllocatorUsed = true ;
}
/*
* Divide all remaining work memory ( availMem ) as read buffers , for all
* the tapes that will be used during the merge .
*
* We use the number of possible * input * tapes here , rather than maxTapes ,
* for the calculation . At all times , we ' ll be reading from at most
* numInputTapes tapes , and one tape is used for output ( unless we do an
* on - the - fly final merge , in which case we don ' t have an output tape ) .
*/
static void
init_tape_buffers ( Tuplesortstate * state , int numInputTapes )
{
int64 availBlocks ;
int64 blocksPerTape ;
int remainder ;
int tapenum ;
/*
* Divide availMem evenly among the number of input tapes .
*/
availBlocks = state - > availMem / BLCKSZ ;
blocksPerTape = availBlocks / numInputTapes ;
remainder = availBlocks % numInputTapes ;
USEMEM ( state , availBlocks * BLCKSZ ) ;
# ifdef TRACE_SORT
if ( trace_sort )
elog ( LOG , " using " INT64_FORMAT " KB of memory for read buffers among %d input tapes " ,
( availBlocks * BLCKSZ ) / 1024 , numInputTapes ) ;
# endif
/*
* Use one page per tape , even if we are out of memory .
* tuplesort_merge_order ( ) should ' ve chosen the number of tapes so that
* this can ' t happen , but better safe than sorry . ( This also protects
* from a negative availMem . )
*/
if ( blocksPerTape < 1 )
{
blocksPerTape = 1 ;
remainder = 0 ;
}
/*
* Set the buffers for the tapes .
*
* In a multi - phase merge , the tape that is initially used as an output
* tape , will later be rewound and read from , and should also use a large
* buffer at that point . So we must loop up to maxTapes , not just
* numInputTapes !
*
* If there are fewer runs than tapes , we will set the buffer size also
* for tapes that will go completely unused , but that ' s harmless .
* LogicalTapeAssignReadBufferSize ( ) doesn ' t allocate the buffer
* immediately , it just sets the size that will be used , when the tape is
* rewound for read , and the tape isn ' t empty .
*/
for ( tapenum = 0 ; tapenum < state - > maxTapes ; tapenum + + )
{
int64 numBlocks = blocksPerTape + ( tapenum < remainder ? 1 : 0 ) ;
LogicalTapeAssignReadBufferSize ( state - > tapeset , tapenum ,
numBlocks * BLCKSZ ) ;
}
}
/*
* mergeruns - - merge all the completed initial runs .
*
@ -2679,25 +2615,32 @@ mergeruns(Tuplesortstate *state)
}
/*
* Use all the spare memory we have available for read buffers for the
* tapes .
* Use all the spare memory we have available for read buffers among the
* input tapes .
*
* We do this only after checking for the case that we produced only one
* initial run , because there is no need to use a large read buffer when
* we ' re reading from a single tape . With one tape , the I / O pattern will
* be the same regardless of the buffer size .
*
* We don ' t try to " rebalance " the amount of memory among tapes , when we
* start a new merge phase , even if some tapes can be inactive in the
* phase . That would be hard , because logtape . c doesn ' t know where one
* run ends and another begins . When a new merge phase begins , and a tape
* doesn ' t participate in it , its buffer nevertheless already contains
* tuples from t he next run on same tape , so we cannot release the buffer .
* That ' s OK in practice , merge performance isn ' t that sensitive to the
* amount of buffers used , and most merge phases use all or almost all
* tapes , anyway .
* We don ' t try to " rebalance " the memory among tapes , when we start a new
* merge phase , even if some tapes are inactive in the new phase . That
* would be hard , because logtape . c doesn ' t know where one run ends and
* another begins . When a new merge phase begins , and a tape doesn ' t
* participate in it , its buffer nevertheless already contains tuples from
* the next run on same tape , so we cannot release the buffer . That ' s OK
* in practice , merge performance isn ' t that sensitive to the amount of
* buffers used , and most merge phases use all or almost all tapes ,
* anyway .
*/
init_tape_buffers ( state , numInputTapes ) ;
# ifdef TRACE_SORT
if ( trace_sort )
elog ( LOG , " using " INT64_FORMAT " KB of memory for read buffers among %d input tapes " ,
( state - > availMem ) / 1024 , numInputTapes ) ;
# endif
state - > read_buffer_size = state - > availMem / numInputTapes ;
USEMEM ( state , state - > availMem ) ;
/*
* Allocate a new ' memtuples ' array , for the heap . It will hold one tuple
@ -2709,7 +2652,7 @@ mergeruns(Tuplesortstate *state)
/* End of step D2: rewind all output tapes to prepare for merging */
for ( tapenum = 0 ; tapenum < state - > tapeRange ; tapenum + + )
LogicalTapeRewind ( state - > tapeset , tapenum , fals e) ;
LogicalTapeRewindForRead ( state - > tapeset , tapenum , state - > read_buffer_siz e) ;
for ( ; ; )
{
@ -2772,11 +2715,10 @@ mergeruns(Tuplesortstate *state)
if ( - - state - > Level = = 0 )
break ;
/* rewind output tape T to use as new input */
LogicalTapeRewind ( state - > tapeset , state - > tp_tapenum [ state - > tapeRange ] ,
fals e ) ;
LogicalTapeRewindForRead ( state - > tapeset , state - > tp_tapenum [ state - > tapeRange ] ,
state - > read_buffer_siz e ) ;
/* rewind used-up input tape P, and prepare it for write pass */
LogicalTapeRewind ( state - > tapeset , state - > tp_tapenum [ state - > tapeRange - 1 ] ,
true ) ;
LogicalTapeRewindForWrite ( state - > tapeset , state - > tp_tapenum [ state - > tapeRange - 1 ] ) ;
state - > tp_runs [ state - > tapeRange - 1 ] = 0 ;
/*
@ -2812,7 +2754,7 @@ mergeruns(Tuplesortstate *state)
for ( tapenum = 0 ; tapenum < state - > maxTapes ; tapenum + + )
{
if ( tapenum ! = state - > result_tape )
LogicalTapeRewind ( state - > tapeset , tapenum , true ) ;
LogicalTapeRewindForWrite ( state - > tapeset , tapenum ) ;
}
}
@ -3174,9 +3116,9 @@ tuplesort_rescan(Tuplesortstate *state)
state - > markpos_eof = false ;
break ;
case TSS_SORTEDONTAPE :
LogicalTapeRewind ( state - > tapeset ,
state - > result_tape ,
false ) ;
LogicalTapeRewindForRead ( state - > tapeset ,
state - > result_tape ,
0 ) ;
state - > eof_reached = false ;
state - > markpos_block = 0L ;
state - > markpos_offset = 0 ;