|
|
|
|
@ -223,6 +223,68 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = |
|
|
|
|
* ---------------------------------------------------------------- |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Streaming read API callback for parallel sequential scans. Returns the next |
|
|
|
|
* block the caller wants from the read stream or InvalidBlockNumber when done. |
|
|
|
|
*/ |
|
|
|
|
static BlockNumber |
|
|
|
|
heap_scan_stream_read_next_parallel(ReadStream *stream, |
|
|
|
|
void *callback_private_data, |
|
|
|
|
void *per_buffer_data) |
|
|
|
|
{ |
|
|
|
|
HeapScanDesc scan = (HeapScanDesc) callback_private_data; |
|
|
|
|
|
|
|
|
|
Assert(ScanDirectionIsForward(scan->rs_dir)); |
|
|
|
|
Assert(scan->rs_base.rs_parallel); |
|
|
|
|
|
|
|
|
|
if (unlikely(!scan->rs_inited)) |
|
|
|
|
{ |
|
|
|
|
/* parallel scan */ |
|
|
|
|
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, |
|
|
|
|
scan->rs_parallelworkerdata, |
|
|
|
|
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); |
|
|
|
|
|
|
|
|
|
/* may return InvalidBlockNumber if there are no more blocks */ |
|
|
|
|
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, |
|
|
|
|
scan->rs_parallelworkerdata, |
|
|
|
|
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); |
|
|
|
|
scan->rs_inited = true; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, |
|
|
|
|
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc) |
|
|
|
|
scan->rs_base.rs_parallel); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return scan->rs_prefetch_block; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Streaming read API callback for serial sequential and TID range scans. |
|
|
|
|
* Returns the next block the caller wants from the read stream or |
|
|
|
|
* InvalidBlockNumber when done. |
|
|
|
|
*/ |
|
|
|
|
static BlockNumber |
|
|
|
|
heap_scan_stream_read_next_serial(ReadStream *stream, |
|
|
|
|
void *callback_private_data, |
|
|
|
|
void *per_buffer_data) |
|
|
|
|
{ |
|
|
|
|
HeapScanDesc scan = (HeapScanDesc) callback_private_data; |
|
|
|
|
|
|
|
|
|
if (unlikely(!scan->rs_inited)) |
|
|
|
|
{ |
|
|
|
|
scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); |
|
|
|
|
scan->rs_inited = true; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
scan->rs_prefetch_block = heapgettup_advance_block(scan, |
|
|
|
|
scan->rs_prefetch_block, |
|
|
|
|
scan->rs_dir); |
|
|
|
|
|
|
|
|
|
return scan->rs_prefetch_block; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* ----------------
|
|
|
|
|
* initscan - scan code common to heap_beginscan and heap_rescan |
|
|
|
|
* ---------------- |
|
|
|
|
@ -325,6 +387,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) |
|
|
|
|
scan->rs_cbuf = InvalidBuffer; |
|
|
|
|
scan->rs_cblock = InvalidBlockNumber; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Initialize to ForwardScanDirection because it is most common and |
|
|
|
|
* because heap scans go forward before going backward (e.g. CURSORs). |
|
|
|
|
*/ |
|
|
|
|
scan->rs_dir = ForwardScanDirection; |
|
|
|
|
scan->rs_prefetch_block = InvalidBlockNumber; |
|
|
|
|
|
|
|
|
|
/* page-at-a-time fields are always invalid when not rs_inited */ |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -508,12 +577,14 @@ heap_prepare_pagescan(TableScanDesc sscan) |
|
|
|
|
/*
|
|
|
|
|
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM. |
|
|
|
|
* |
|
|
|
|
* Read the next block of the scan relation into a buffer and pin that buffer |
|
|
|
|
* before saving it in the scan descriptor. |
|
|
|
|
* Read the next block of the scan relation from the read stream and save it |
|
|
|
|
* in the scan descriptor. It is already pinned. |
|
|
|
|
*/ |
|
|
|
|
static inline void |
|
|
|
|
heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) |
|
|
|
|
{ |
|
|
|
|
Assert(scan->rs_read_stream); |
|
|
|
|
|
|
|
|
|
/* release previous scan buffer, if any */ |
|
|
|
|
if (BufferIsValid(scan->rs_cbuf)) |
|
|
|
|
{ |
|
|
|
|
@ -528,25 +599,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) |
|
|
|
|
*/ |
|
|
|
|
CHECK_FOR_INTERRUPTS(); |
|
|
|
|
|
|
|
|
|
if (unlikely(!scan->rs_inited)) |
|
|
|
|
/*
|
|
|
|
|
* If the scan direction is changing, reset the prefetch block to the |
|
|
|
|
* current block. Otherwise, we will incorrectly prefetch the blocks |
|
|
|
|
* between the prefetch block and the current block again before |
|
|
|
|
* prefetching blocks in the new, correct scan direction. |
|
|
|
|
*/ |
|
|
|
|
if (unlikely(scan->rs_dir != dir)) |
|
|
|
|
{ |
|
|
|
|
scan->rs_cblock = heapgettup_initial_block(scan, dir); |
|
|
|
|
scan->rs_prefetch_block = scan->rs_cblock; |
|
|
|
|
read_stream_reset(scan->rs_read_stream); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ |
|
|
|
|
Assert(scan->rs_cblock != InvalidBlockNumber || |
|
|
|
|
!BufferIsValid(scan->rs_cbuf)); |
|
|
|
|
scan->rs_dir = dir; |
|
|
|
|
|
|
|
|
|
scan->rs_inited = true; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, |
|
|
|
|
dir); |
|
|
|
|
|
|
|
|
|
/* read block if valid */ |
|
|
|
|
if (BlockNumberIsValid(scan->rs_cblock)) |
|
|
|
|
scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, |
|
|
|
|
scan->rs_cblock, RBM_NORMAL, |
|
|
|
|
scan->rs_strategy); |
|
|
|
|
scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); |
|
|
|
|
if (BufferIsValid(scan->rs_cbuf)) |
|
|
|
|
scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -560,6 +629,7 @@ static pg_noinline BlockNumber |
|
|
|
|
heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) |
|
|
|
|
{ |
|
|
|
|
Assert(!scan->rs_inited); |
|
|
|
|
Assert(scan->rs_base.rs_parallel == NULL); |
|
|
|
|
|
|
|
|
|
/* When there are no pages to scan, return InvalidBlockNumber */ |
|
|
|
|
if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) |
|
|
|
|
@ -567,27 +637,10 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) |
|
|
|
|
|
|
|
|
|
if (ScanDirectionIsForward(dir)) |
|
|
|
|
{ |
|
|
|
|
/* serial scan */ |
|
|
|
|
if (scan->rs_base.rs_parallel == NULL) |
|
|
|
|
return scan->rs_startblock; |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
/* parallel scan */ |
|
|
|
|
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, |
|
|
|
|
scan->rs_parallelworkerdata, |
|
|
|
|
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); |
|
|
|
|
|
|
|
|
|
/* may return InvalidBlockNumber if there are no more blocks */ |
|
|
|
|
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, |
|
|
|
|
scan->rs_parallelworkerdata, |
|
|
|
|
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); |
|
|
|
|
} |
|
|
|
|
return scan->rs_startblock; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
/* backward parallel scan not supported */ |
|
|
|
|
Assert(scan->rs_base.rs_parallel == NULL); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Disable reporting to syncscan logic in a backwards scan; it's not |
|
|
|
|
* very likely anyone else is doing the same thing at the same time, |
|
|
|
|
@ -699,50 +752,43 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft, |
|
|
|
|
static inline BlockNumber |
|
|
|
|
heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir) |
|
|
|
|
{ |
|
|
|
|
if (ScanDirectionIsForward(dir)) |
|
|
|
|
Assert(scan->rs_base.rs_parallel == NULL); |
|
|
|
|
|
|
|
|
|
if (likely(ScanDirectionIsForward(dir))) |
|
|
|
|
{ |
|
|
|
|
if (scan->rs_base.rs_parallel == NULL) |
|
|
|
|
{ |
|
|
|
|
block++; |
|
|
|
|
block++; |
|
|
|
|
|
|
|
|
|
/* wrap back to the start of the heap */ |
|
|
|
|
if (block >= scan->rs_nblocks) |
|
|
|
|
block = 0; |
|
|
|
|
/* wrap back to the start of the heap */ |
|
|
|
|
if (block >= scan->rs_nblocks) |
|
|
|
|
block = 0; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Report our new scan position for synchronization purposes. We |
|
|
|
|
* don't do that when moving backwards, however. That would just |
|
|
|
|
* mess up any other forward-moving scanners. |
|
|
|
|
* |
|
|
|
|
* Note: we do this before checking for end of scan so that the |
|
|
|
|
* final state of the position hint is back at the start of the |
|
|
|
|
* rel. That's not strictly necessary, but otherwise when you run |
|
|
|
|
* the same query multiple times the starting position would shift |
|
|
|
|
* a little bit backwards on every invocation, which is confusing. |
|
|
|
|
* We don't guarantee any specific ordering in general, though. |
|
|
|
|
*/ |
|
|
|
|
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC) |
|
|
|
|
ss_report_location(scan->rs_base.rs_rd, block); |
|
|
|
|
|
|
|
|
|
/* we're done if we're back at where we started */ |
|
|
|
|
if (block == scan->rs_startblock) |
|
|
|
|
return InvalidBlockNumber; |
|
|
|
|
/*
|
|
|
|
|
* Report our new scan position for synchronization purposes. We don't |
|
|
|
|
* do that when moving backwards, however. That would just mess up any |
|
|
|
|
* other forward-moving scanners. |
|
|
|
|
* |
|
|
|
|
* Note: we do this before checking for end of scan so that the final |
|
|
|
|
* state of the position hint is back at the start of the rel. That's |
|
|
|
|
* not strictly necessary, but otherwise when you run the same query |
|
|
|
|
* multiple times the starting position would shift a little bit |
|
|
|
|
* backwards on every invocation, which is confusing. We don't |
|
|
|
|
* guarantee any specific ordering in general, though. |
|
|
|
|
*/ |
|
|
|
|
if (scan->rs_base.rs_flags & SO_ALLOW_SYNC) |
|
|
|
|
ss_report_location(scan->rs_base.rs_rd, block); |
|
|
|
|
|
|
|
|
|
/* check if the limit imposed by heap_setscanlimits() is met */ |
|
|
|
|
if (scan->rs_numblocks != InvalidBlockNumber) |
|
|
|
|
{ |
|
|
|
|
if (--scan->rs_numblocks == 0) |
|
|
|
|
return InvalidBlockNumber; |
|
|
|
|
} |
|
|
|
|
/* we're done if we're back at where we started */ |
|
|
|
|
if (block == scan->rs_startblock) |
|
|
|
|
return InvalidBlockNumber; |
|
|
|
|
|
|
|
|
|
return block; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
/* check if the limit imposed by heap_setscanlimits() is met */ |
|
|
|
|
if (scan->rs_numblocks != InvalidBlockNumber) |
|
|
|
|
{ |
|
|
|
|
return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, |
|
|
|
|
scan->rs_parallelworkerdata, (ParallelBlockTableScanDesc) |
|
|
|
|
scan->rs_base.rs_parallel); |
|
|
|
|
if (--scan->rs_numblocks == 0) |
|
|
|
|
return InvalidBlockNumber; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return block; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
@ -879,6 +925,7 @@ continue_page: |
|
|
|
|
|
|
|
|
|
scan->rs_cbuf = InvalidBuffer; |
|
|
|
|
scan->rs_cblock = InvalidBlockNumber; |
|
|
|
|
scan->rs_prefetch_block = InvalidBlockNumber; |
|
|
|
|
tuple->t_data = NULL; |
|
|
|
|
scan->rs_inited = false; |
|
|
|
|
} |
|
|
|
|
@ -974,6 +1021,7 @@ continue_page: |
|
|
|
|
ReleaseBuffer(scan->rs_cbuf); |
|
|
|
|
scan->rs_cbuf = InvalidBuffer; |
|
|
|
|
scan->rs_cblock = InvalidBlockNumber; |
|
|
|
|
scan->rs_prefetch_block = InvalidBlockNumber; |
|
|
|
|
tuple->t_data = NULL; |
|
|
|
|
scan->rs_inited = false; |
|
|
|
|
} |
|
|
|
|
@ -1069,6 +1117,33 @@ heap_beginscan(Relation relation, Snapshot snapshot, |
|
|
|
|
|
|
|
|
|
initscan(scan, key, false); |
|
|
|
|
|
|
|
|
|
scan->rs_read_stream = NULL; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Set up a read stream for sequential scans and TID range scans. This |
|
|
|
|
* should be done after initscan() because initscan() allocates the |
|
|
|
|
* BufferAccessStrategy object passed to the streaming read API. |
|
|
|
|
*/ |
|
|
|
|
if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || |
|
|
|
|
scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) |
|
|
|
|
{ |
|
|
|
|
ReadStreamBlockNumberCB cb; |
|
|
|
|
|
|
|
|
|
if (scan->rs_base.rs_parallel) |
|
|
|
|
cb = heap_scan_stream_read_next_parallel; |
|
|
|
|
else |
|
|
|
|
cb = heap_scan_stream_read_next_serial; |
|
|
|
|
|
|
|
|
|
scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, |
|
|
|
|
scan->rs_strategy, |
|
|
|
|
scan->rs_base.rs_rd, |
|
|
|
|
MAIN_FORKNUM, |
|
|
|
|
cb, |
|
|
|
|
scan, |
|
|
|
|
0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return (TableScanDesc) scan; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -1111,6 +1186,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, |
|
|
|
|
|
|
|
|
|
Assert(scan->rs_empty_tuples_pending == 0); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The read stream is reset on rescan. This must be done before |
|
|
|
|
* initscan(), as some state referred to by read_stream_reset() is reset |
|
|
|
|
* in initscan(). |
|
|
|
|
*/ |
|
|
|
|
if (scan->rs_read_stream) |
|
|
|
|
read_stream_reset(scan->rs_read_stream); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* reinitialize scan descriptor |
|
|
|
|
*/ |
|
|
|
|
@ -1135,6 +1218,12 @@ heap_endscan(TableScanDesc sscan) |
|
|
|
|
|
|
|
|
|
Assert(scan->rs_empty_tuples_pending == 0); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Must free the read stream before freeing the BufferAccessStrategy. |
|
|
|
|
*/ |
|
|
|
|
if (scan->rs_read_stream) |
|
|
|
|
read_stream_end(scan->rs_read_stream); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* decrement relation reference count and free scan descriptor storage |
|
|
|
|
*/ |
|
|
|
|
|