@ -23,16 +23,32 @@
# include "access/genam.h"
# include "access/heapam.h"
# include "access/multixact.h"
# include "access/rewriteheap.h"
# include "access/tableam.h"
# include "access/xact.h"
# include "catalog/catalog.h"
# include "catalog/index.h"
# include "catalog/storage.h"
# include "catalog/storage_xlog.h"
# include "commands/progress.h"
# include "executor/executor.h"
# include "pgstat.h"
# include "storage/bufmgr.h"
# include "storage/bufpage.h"
# include "storage/bufmgr.h"
# include "storage/lmgr.h"
# include "storage/predicate.h"
# include "storage/procarray.h"
# include "storage/smgr.h"
# include "utils/builtins.h"
# include "utils/rel.h"
static void
reform_and_rewrite_tuple ( HeapTuple tuple ,
Relation OldHeap , Relation NewHeap ,
Datum * values , bool * isnull , RewriteState rwstate ) ;
static const TableAmRoutine heapam_methods ;
@ -523,6 +539,388 @@ tuple_lock_retry:
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static void
heapam_relation_set_new_filenode ( Relation rel , char persistence ,
TransactionId * freezeXid ,
MultiXactId * minmulti )
{
/*
* Initialize to the minimum XID that could put tuples in the table . We
* know that no xacts older than RecentXmin are still running , so that
* will do .
*/
* freezeXid = RecentXmin ;
/*
* Similarly , initialize the minimum Multixact to the first value that
* could possibly be stored in tuples in the table . Running transactions
* could reuse values from their local cache , so we are careful to
* consider all currently running multis .
*
* XXX this could be refined further , but is it worth the hassle ?
*/
* minmulti = GetOldestMultiXactId ( ) ;
RelationCreateStorage ( rel - > rd_node , persistence ) ;
/*
* If required , set up an init fork for an unlogged table so that it can
* be correctly reinitialized on restart . An immediate sync is required
* even if the page has been logged , because the write did not go through
* shared_buffers and therefore a concurrent checkpoint may have moved the
* redo pointer past our xlog record . Recovery may as well remove it
* while replaying , for example , XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
* record . Therefore , logging is necessary even if wal_level = minimal .
*/
if ( rel - > rd_rel - > relpersistence = = RELPERSISTENCE_UNLOGGED )
{
Assert ( rel - > rd_rel - > relkind = = RELKIND_RELATION | |
rel - > rd_rel - > relkind = = RELKIND_MATVIEW | |
rel - > rd_rel - > relkind = = RELKIND_TOASTVALUE ) ;
RelationOpenSmgr ( rel ) ;
smgrcreate ( rel - > rd_smgr , INIT_FORKNUM , false ) ;
log_smgrcreate ( & rel - > rd_smgr - > smgr_rnode . node , INIT_FORKNUM ) ;
smgrimmedsync ( rel - > rd_smgr , INIT_FORKNUM ) ;
}
}
static void
heapam_relation_nontransactional_truncate ( Relation rel )
{
RelationTruncate ( rel , 0 ) ;
}
static void
heapam_relation_copy_data ( Relation rel , RelFileNode newrnode )
{
SMgrRelation dstrel ;
dstrel = smgropen ( newrnode , rel - > rd_backend ) ;
RelationOpenSmgr ( rel ) ;
/*
* Create and copy all forks of the relation , and schedule unlinking of
* old physical files .
*
* NOTE : any conflict in relfilenode value will be caught in
* RelationCreateStorage ( ) .
*/
RelationCreateStorage ( newrnode , rel - > rd_rel - > relpersistence ) ;
/* copy main fork */
RelationCopyStorage ( rel - > rd_smgr , dstrel , MAIN_FORKNUM ,
rel - > rd_rel - > relpersistence ) ;
/* copy those extra forks that exist */
for ( ForkNumber forkNum = MAIN_FORKNUM + 1 ;
forkNum < = MAX_FORKNUM ; forkNum + + )
{
if ( smgrexists ( rel - > rd_smgr , forkNum ) )
{
smgrcreate ( dstrel , forkNum , false ) ;
/*
* WAL log creation if the relation is persistent , or this is the
* init fork of an unlogged relation .
*/
if ( rel - > rd_rel - > relpersistence = = RELPERSISTENCE_PERMANENT | |
( rel - > rd_rel - > relpersistence = = RELPERSISTENCE_UNLOGGED & &
forkNum = = INIT_FORKNUM ) )
log_smgrcreate ( & newrnode , forkNum ) ;
RelationCopyStorage ( rel - > rd_smgr , dstrel , forkNum ,
rel - > rd_rel - > relpersistence ) ;
}
}
/* drop old relation, and close new one */
RelationDropStorage ( rel ) ;
smgrclose ( dstrel ) ;
}
static void
heapam_relation_copy_for_cluster ( Relation OldHeap , Relation NewHeap ,
Relation OldIndex , bool use_sort ,
TransactionId OldestXmin ,
TransactionId FreezeXid ,
MultiXactId MultiXactCutoff ,
double * num_tuples ,
double * tups_vacuumed ,
double * tups_recently_dead )
{
RewriteState rwstate ;
IndexScanDesc indexScan ;
TableScanDesc tableScan ;
HeapScanDesc heapScan ;
bool use_wal ;
bool is_system_catalog ;
Tuplesortstate * tuplesort ;
TupleDesc oldTupDesc = RelationGetDescr ( OldHeap ) ;
TupleDesc newTupDesc = RelationGetDescr ( NewHeap ) ;
TupleTableSlot * slot ;
int natts ;
Datum * values ;
bool * isnull ;
BufferHeapTupleTableSlot * hslot ;
/* Remember if it's a system catalog */
is_system_catalog = IsSystemRelation ( OldHeap ) ;
/*
* We need to log the copied data in WAL iff WAL archiving / streaming is
* enabled AND it ' s a WAL - logged rel .
*/
use_wal = XLogIsNeeded ( ) & & RelationNeedsWAL ( NewHeap ) ;
/* use_wal off requires smgr_targblock be initially invalid */
Assert ( RelationGetTargetBlock ( NewHeap ) = = InvalidBlockNumber ) ;
/* Preallocate values/isnull arrays */
natts = newTupDesc - > natts ;
values = ( Datum * ) palloc ( natts * sizeof ( Datum ) ) ;
isnull = ( bool * ) palloc ( natts * sizeof ( bool ) ) ;
/* Initialize the rewrite operation */
rwstate = begin_heap_rewrite ( OldHeap , NewHeap , OldestXmin , FreezeXid ,
MultiXactCutoff , use_wal ) ;
/* Set up sorting if wanted */
if ( use_sort )
tuplesort = tuplesort_begin_cluster ( oldTupDesc , OldIndex ,
maintenance_work_mem ,
NULL , false ) ;
else
tuplesort = NULL ;
/*
* Prepare to scan the OldHeap . To ensure we see recently - dead tuples
* that still need to be copied , we scan with SnapshotAny and use
* HeapTupleSatisfiesVacuum for the visibility test .
*/
if ( OldIndex ! = NULL & & ! use_sort )
{
const int ci_index [ ] = {
PROGRESS_CLUSTER_PHASE ,
PROGRESS_CLUSTER_INDEX_RELID
} ;
int64 ci_val [ 2 ] ;
/* Set phase and OIDOldIndex to columns */
ci_val [ 0 ] = PROGRESS_CLUSTER_PHASE_INDEX_SCAN_HEAP ;
ci_val [ 1 ] = RelationGetRelid ( OldIndex ) ;
pgstat_progress_update_multi_param ( 2 , ci_index , ci_val ) ;
tableScan = NULL ;
heapScan = NULL ;
indexScan = index_beginscan ( OldHeap , OldIndex , SnapshotAny , 0 , 0 ) ;
index_rescan ( indexScan , NULL , 0 , NULL , 0 ) ;
}
else
{
/* In scan-and-sort mode and also VACUUM FULL, set phase */
pgstat_progress_update_param ( PROGRESS_CLUSTER_PHASE ,
PROGRESS_CLUSTER_PHASE_SEQ_SCAN_HEAP ) ;
tableScan = table_beginscan ( OldHeap , SnapshotAny , 0 , ( ScanKey ) NULL ) ;
heapScan = ( HeapScanDesc ) tableScan ;
indexScan = NULL ;
/* Set total heap blocks */
pgstat_progress_update_param ( PROGRESS_CLUSTER_TOTAL_HEAP_BLKS ,
heapScan - > rs_nblocks ) ;
}
slot = table_slot_create ( OldHeap , NULL ) ;
hslot = ( BufferHeapTupleTableSlot * ) slot ;
/*
* Scan through the OldHeap , either in OldIndex order or sequentially ;
* copy each tuple into the NewHeap , or transiently to the tuplesort
* module . Note that we don ' t bother sorting dead tuples ( they won ' t get
* to the new table anyway ) .
*/
for ( ; ; )
{
HeapTuple tuple ;
Buffer buf ;
bool isdead ;
CHECK_FOR_INTERRUPTS ( ) ;
if ( indexScan ! = NULL )
{
if ( ! index_getnext_slot ( indexScan , ForwardScanDirection , slot ) )
break ;
/* Since we used no scan keys, should never need to recheck */
if ( indexScan - > xs_recheck )
elog ( ERROR , " CLUSTER does not support lossy index conditions " ) ;
}
else
{
if ( ! table_scan_getnextslot ( tableScan , ForwardScanDirection , slot ) )
break ;
/* In scan-and-sort mode and also VACUUM FULL, set heap blocks scanned */
pgstat_progress_update_param ( PROGRESS_CLUSTER_HEAP_BLKS_SCANNED ,
heapScan - > rs_cblock + 1 ) ;
}
tuple = ExecFetchSlotHeapTuple ( slot , false , NULL ) ;
buf = hslot - > buffer ;
LockBuffer ( buf , BUFFER_LOCK_SHARE ) ;
switch ( HeapTupleSatisfiesVacuum ( tuple , OldestXmin , buf ) )
{
case HEAPTUPLE_DEAD :
/* Definitely dead */
isdead = true ;
break ;
case HEAPTUPLE_RECENTLY_DEAD :
* tups_recently_dead + = 1 ;
/* fall through */
case HEAPTUPLE_LIVE :
/* Live or recently dead, must copy it */
isdead = false ;
break ;
case HEAPTUPLE_INSERT_IN_PROGRESS :
/*
* Since we hold exclusive lock on the relation , normally the
* only way to see this is if it was inserted earlier in our
* own transaction . However , it can happen in system
* catalogs , since we tend to release write lock before commit
* there . Give a warning if neither case applies ; but in any
* case we had better copy it .
*/
if ( ! is_system_catalog & &
! TransactionIdIsCurrentTransactionId ( HeapTupleHeaderGetXmin ( tuple - > t_data ) ) )
elog ( WARNING , " concurrent insert in progress within table \" %s \" " ,
RelationGetRelationName ( OldHeap ) ) ;
/* treat as live */
isdead = false ;
break ;
case HEAPTUPLE_DELETE_IN_PROGRESS :
/*
* Similar situation to INSERT_IN_PROGRESS case .
*/
if ( ! is_system_catalog & &
! TransactionIdIsCurrentTransactionId ( HeapTupleHeaderGetUpdateXid ( tuple - > t_data ) ) )
elog ( WARNING , " concurrent delete in progress within table \" %s \" " ,
RelationGetRelationName ( OldHeap ) ) ;
/* treat as recently dead */
* tups_recently_dead + = 1 ;
isdead = false ;
break ;
default :
elog ( ERROR , " unexpected HeapTupleSatisfiesVacuum result " ) ;
isdead = false ; /* keep compiler quiet */
break ;
}
LockBuffer ( buf , BUFFER_LOCK_UNLOCK ) ;
if ( isdead )
{
* tups_vacuumed + = 1 ;
/* heap rewrite module still needs to see it... */
if ( rewrite_heap_dead_tuple ( rwstate , tuple ) )
{
/* A previous recently-dead tuple is now known dead */
* tups_vacuumed + = 1 ;
* tups_recently_dead - = 1 ;
}
continue ;
}
* num_tuples + = 1 ;
if ( tuplesort ! = NULL )
{
tuplesort_putheaptuple ( tuplesort , tuple ) ;
/* In scan-and-sort mode, report increase in number of tuples scanned */
pgstat_progress_update_param ( PROGRESS_CLUSTER_HEAP_TUPLES_SCANNED ,
* num_tuples ) ;
}
else
{
const int ct_index [ ] = {
PROGRESS_CLUSTER_HEAP_TUPLES_SCANNED ,
PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN
} ;
int64 ct_val [ 2 ] ;
reform_and_rewrite_tuple ( tuple , OldHeap , NewHeap ,
values , isnull , rwstate ) ;
/*
* In indexscan mode and also VACUUM FULL , report increase in
* number of tuples scanned and written
*/
ct_val [ 0 ] = * num_tuples ;
ct_val [ 1 ] = * num_tuples ;
pgstat_progress_update_multi_param ( 2 , ct_index , ct_val ) ;
}
}
if ( indexScan ! = NULL )
index_endscan ( indexScan ) ;
if ( tableScan ! = NULL )
table_endscan ( tableScan ) ;
if ( slot )
ExecDropSingleTupleTableSlot ( slot ) ;
/*
* In scan - and - sort mode , complete the sort , then read out all live tuples
* from the tuplestore and write them to the new relation .
*/
if ( tuplesort ! = NULL )
{
double n_tuples = 0 ;
/* Report that we are now sorting tuples */
pgstat_progress_update_param ( PROGRESS_CLUSTER_PHASE ,
PROGRESS_CLUSTER_PHASE_SORT_TUPLES ) ;
tuplesort_performsort ( tuplesort ) ;
/* Report that we are now writing new heap */
pgstat_progress_update_param ( PROGRESS_CLUSTER_PHASE ,
PROGRESS_CLUSTER_PHASE_WRITE_NEW_HEAP ) ;
for ( ; ; )
{
HeapTuple tuple ;
CHECK_FOR_INTERRUPTS ( ) ;
tuple = tuplesort_getheaptuple ( tuplesort , true ) ;
if ( tuple = = NULL )
break ;
n_tuples + = 1 ;
reform_and_rewrite_tuple ( tuple ,
OldHeap , NewHeap ,
values , isnull ,
rwstate ) ;
/* Report n_tuples */
pgstat_progress_update_param ( PROGRESS_CLUSTER_HEAP_TUPLES_WRITTEN ,
n_tuples ) ;
}
tuplesort_end ( tuplesort ) ;
}
/* Write out any remaining tuples, and fsync if needed */
end_heap_rewrite ( rwstate ) ;
/* Clean up */
pfree ( values ) ;
pfree ( isnull ) ;
}
static double
heapam_index_build_range_scan ( Relation heapRelation ,
Relation indexRelation ,
@ -1256,6 +1654,55 @@ heapam_index_validate_scan(Relation heapRelation,
}
/* ----------------------------------------------------------------------------
* Helper functions for the above .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
/*
* Reconstruct and rewrite the given tuple
*
* We cannot simply copy the tuple as - is , for several reasons :
*
* 1. We ' d like to squeeze out the values of any dropped columns , both
* to save space and to ensure we have no corner - case failures . ( It ' s
* possible for example that the new table hasn ' t got a TOAST table
* and so is unable to store any large values of dropped cols . )
*
* 2. The tuple might not even be legal for the new table ; this is
* currently only known to happen as an after - effect of ALTER TABLE
* SET WITHOUT OIDS .
*
* So , we must reconstruct the tuple from component Datums .
*/
static void
reform_and_rewrite_tuple ( HeapTuple tuple ,
Relation OldHeap , Relation NewHeap ,
Datum * values , bool * isnull , RewriteState rwstate )
{
TupleDesc oldTupDesc = RelationGetDescr ( OldHeap ) ;
TupleDesc newTupDesc = RelationGetDescr ( NewHeap ) ;
HeapTuple copiedTuple ;
int i ;
heap_deform_tuple ( tuple , oldTupDesc , values , isnull ) ;
/* Be sure to null out any dropped columns */
for ( i = 0 ; i < newTupDesc - > natts ; i + + )
{
if ( TupleDescAttr ( newTupDesc , i ) - > attisdropped )
isnull [ i ] = true ;
}
copiedTuple = heap_form_tuple ( newTupDesc , values , isnull ) ;
/* The heap rewrite module does the rest */
rewrite_heap_tuple ( rwstate , tuple , copiedTuple ) ;
heap_freetuple ( copiedTuple ) ;
}
/* ------------------------------------------------------------------------
* Definition of the heap table access method .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@ -1292,6 +1739,10 @@ static const TableAmRoutine heapam_methods = {
. tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot ,
. compute_xid_horizon_for_tuples = heap_compute_xid_horizon_for_tuples ,
. relation_set_new_filenode = heapam_relation_set_new_filenode ,
. relation_nontransactional_truncate = heapam_relation_nontransactional_truncate ,
. relation_copy_data = heapam_relation_copy_data ,
. relation_copy_for_cluster = heapam_relation_copy_for_cluster ,
. index_build_range_scan = heapam_index_build_range_scan ,
. index_validate_scan = heapam_index_validate_scan ,
} ;