@ -11,7 +11,7 @@
*
*
*
*
* IDENTIFICATION
* IDENTIFICATION
* $ Header : / cvsroot / pgsql / src / backend / commands / cluster . c , v 1.93 2002 / 11 / 11 22 : 19 : 21 tgl Exp $
* $ Header : / cvsroot / pgsql / src / backend / commands / cluster . c , v 1.94 2002 / 11 / 15 03 : 09 : 35 momjian Exp $
*
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
*/
@ -25,9 +25,11 @@
# include "catalog/index.h"
# include "catalog/index.h"
# include "catalog/indexing.h"
# include "catalog/indexing.h"
# include "catalog/catname.h"
# include "catalog/catname.h"
# include "catalog/namespace.h"
# include "commands/cluster.h"
# include "commands/cluster.h"
# include "commands/tablecmds.h"
# include "commands/tablecmds.h"
# include "miscadmin.h"
# include "miscadmin.h"
# include "utils/acl.h"
# include "utils/fmgroids.h"
# include "utils/fmgroids.h"
# include "utils/lsyscache.h"
# include "utils/lsyscache.h"
# include "utils/syscache.h"
# include "utils/syscache.h"
@ -48,12 +50,27 @@ typedef struct
bool isclustered ;
bool isclustered ;
} IndexAttrs ;
} IndexAttrs ;
/* This struct is used to pass around the information on tables to be
* clustered . We need this so we can make a list of them when invoked without
* a specific table / index pair .
*/
typedef struct
{
Oid tableOid ;
Oid indexOid ;
bool isPrevious ;
} relToCluster ;
static Oid make_new_heap ( Oid OIDOldHeap , const char * NewName ) ;
static Oid make_new_heap ( Oid OIDOldHeap , const char * NewName ) ;
static void copy_heap_data ( Oid OIDNewHeap , Oid OIDOldHeap , Oid OIDOldIndex ) ;
static void copy_heap_data ( Oid OIDNewHeap , Oid OIDOldHeap , Oid OIDOldIndex ) ;
static List * get_indexattr_list ( Relation OldHeap , Oid OldIndex ) ;
static List * get_indexattr_list ( Relation OldHeap , Oid OldIndex ) ;
static void recreate_indexattr ( Oid OIDOldHeap , List * indexes ) ;
static void recreate_indexattr ( Oid OIDOldHeap , List * indexes ) ;
static void swap_relfilenodes ( Oid r1 , Oid r2 ) ;
static void swap_relfilenodes ( Oid r1 , Oid r2 ) ;
static void cluster_rel ( relToCluster * rv ) ;
static bool check_cluster_ownership ( Oid relOid ) ;
static List * get_tables_to_cluster ( Oid owner ) ;
static MemoryContext cluster_context = NULL ;
/*
/*
* cluster
* cluster
@ -69,43 +86,70 @@ static void swap_relfilenodes(Oid r1, Oid r2);
* the new table , it ' s better to create the indexes afterwards than to fill
* the new table , it ' s better to create the indexes afterwards than to fill
* them incrementally while we load the table .
* them incrementally while we load the table .
*
*
* Permissions checks were done already .
* Since we may open a new transaction for each relation , we have to
* check that the relation still is what we think it is .
*/
*/
void
void
cluster ( RangeVar * oldrelation , char * oldindexname )
cluster_rel ( relToCluster * rvtc )
{
{
Oid OIDOldHeap ,
Oid OIDNewHeap ;
OIDOldIndex ,
OIDNewHeap ;
Relation OldHeap ,
Relation OldHeap ,
OldIndex ;
OldIndex ;
char NewHeapName [ NAMEDATALEN ] ;
char NewHeapName [ NAMEDATALEN ] ;
ObjectAddress object ;
ObjectAddress object ;
List * indexes ;
List * indexes ;
/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS ( ) ;
/* Check if the relation and index still exist before opening them
*/
if ( ! SearchSysCacheExists ( RELOID ,
ObjectIdGetDatum ( rvtc - > tableOid ) ,
0 , 0 , 0 ) | |
! SearchSysCacheExists ( RELOID ,
ObjectIdGetDatum ( rvtc - > indexOid ) ,
0 , 0 , 0 ) )
return ;
/* Check that the user still owns the relation */
if ( ! check_cluster_ownership ( rvtc - > tableOid ) )
return ;
/* Check that the index is still the one with indisclustered set.
* If this is a standalone cluster , skip this test .
*/
if ( rvtc - > isPrevious )
{
HeapTuple tuple ;
Form_pg_index indexForm ;
tuple = SearchSysCache ( INDEXRELID ,
ObjectIdGetDatum ( rvtc - > indexOid ) ,
0 , 0 , 0 ) ;
indexForm = ( Form_pg_index ) GETSTRUCT ( tuple ) ;
if ( ! indexForm - > indisclustered )
{
ReleaseSysCache ( tuple ) ;
return ;
}
ReleaseSysCache ( tuple ) ;
}
/*
/*
* We grab exclusive access to the target rel and index for the
* We grab exclusive access to the target rel and index for the
* duration of the transaction .
* duration of the transaction .
*/
*/
OldHeap = heap_openrv ( oldrelation , AccessExclusiveLock ) ;
OldHeap = heap_open ( rvtc - > tableOid , AccessExclusiveLock ) ;
OIDOldHeap = RelationGetRelid ( OldHeap ) ;
/*
OldIndex = index_open ( rvtc - > indexOid ) ;
* The index is expected to be in the same namespace as the relation .
*/
OIDOldIndex = get_relname_relid ( oldindexname ,
RelationGetNamespace ( OldHeap ) ) ;
if ( ! OidIsValid ( OIDOldIndex ) )
elog ( ERROR , " CLUSTER: cannot find index \" %s \" for table \" %s \" " ,
oldindexname , RelationGetRelationName ( OldHeap ) ) ;
OldIndex = index_open ( OIDOldIndex ) ;
LockRelation ( OldIndex , AccessExclusiveLock ) ;
LockRelation ( OldIndex , AccessExclusiveLock ) ;
/*
/*
* Check that index is in fact an index on the given relation
* Check that index is in fact an index on the given relation
*/
*/
if ( OldIndex - > rd_index = = NULL | |
if ( OldIndex - > rd_index = = NULL | |
OldIndex - > rd_index - > indrelid ! = OIDOldHeap )
OldIndex - > rd_index - > indrelid ! = rvtc - > tableOid )
elog ( ERROR , " CLUSTER: \" %s \" is not an index for table \" %s \" " ,
elog ( ERROR , " CLUSTER: \" %s \" is not an index for table \" %s \" " ,
RelationGetRelationName ( OldIndex ) ,
RelationGetRelationName ( OldIndex ) ,
RelationGetRelationName ( OldHeap ) ) ;
RelationGetRelationName ( OldHeap ) ) ;
@ -122,7 +166,7 @@ cluster(RangeVar *oldrelation, char *oldindexname)
RelationGetRelationName ( OldHeap ) ) ;
RelationGetRelationName ( OldHeap ) ) ;
/* Save the information of all indexes on the relation. */
/* Save the information of all indexes on the relation. */
indexes = get_indexattr_list ( OldHeap , OIDOldIndex ) ;
indexes = get_indexattr_list ( OldHeap , rvtc - > indexOid ) ;
/* Drop relcache refcnts, but do NOT give up the locks */
/* Drop relcache refcnts, but do NOT give up the locks */
index_close ( OldIndex ) ;
index_close ( OldIndex ) ;
@ -136,9 +180,9 @@ cluster(RangeVar *oldrelation, char *oldindexname)
* namespace from the old , or we will have problems with the TEMP
* namespace from the old , or we will have problems with the TEMP
* status of temp tables .
* status of temp tables .
*/
*/
snprintf ( NewHeapName , NAMEDATALEN , " pg_temp_%u " , OIDOldHeap ) ;
snprintf ( NewHeapName , NAMEDATALEN , " pg_temp_%u " , rvtc - > tableOid ) ;
OIDNewHeap = make_new_heap ( OIDOldHeap , NewHeapName ) ;
OIDNewHeap = make_new_heap ( rvtc - > tableOid , NewHeapName ) ;
/*
/*
* We don ' t need CommandCounterIncrement ( ) because make_new_heap did
* We don ' t need CommandCounterIncrement ( ) because make_new_heap did
@ -148,13 +192,13 @@ cluster(RangeVar *oldrelation, char *oldindexname)
/*
/*
* Copy the heap data into the new table in the desired order .
* Copy the heap data into the new table in the desired order .
*/
*/
copy_heap_data ( OIDNewHeap , OIDOldHeap , OIDOldIndex ) ;
copy_heap_data ( OIDNewHeap , rvtc - > tableOid , rvtc - > indexOid ) ;
/* To make the new heap's data visible (probably not needed?). */
/* To make the new heap's data visible (probably not needed?). */
CommandCounterIncrement ( ) ;
CommandCounterIncrement ( ) ;
/* Swap the relfilenodes of the old and new heaps. */
/* Swap the relfilenodes of the old and new heaps. */
swap_relfilenodes ( OIDOldHeap , OIDNewHeap ) ;
swap_relfilenodes ( rvtc - > tableOid , OIDNewHeap ) ;
CommandCounterIncrement ( ) ;
CommandCounterIncrement ( ) ;
@ -175,7 +219,7 @@ cluster(RangeVar *oldrelation, char *oldindexname)
* Recreate each index on the relation . We do not need
* Recreate each index on the relation . We do not need
* CommandCounterIncrement ( ) because recreate_indexattr does it .
* CommandCounterIncrement ( ) because recreate_indexattr does it .
*/
*/
recreate_indexattr ( OIDOldHeap , indexes ) ;
recreate_indexattr ( rvtc - > tableOid , indexes ) ;
}
}
/*
/*
@ -571,3 +615,236 @@ swap_relfilenodes(Oid r1, Oid r2)
heap_close ( relRelation , RowExclusiveLock ) ;
heap_close ( relRelation , RowExclusiveLock ) ;
}
}
/*---------------------------------------------------------------------------
* This cluster code allows for clustering multiple tables at once . Because
* of this , we cannot just run everything on a single transaction , or we
* would be forced to acquire exclusive locks on all the tables being
* clustered . To solve this we follow a similar strategy to VACUUM code ,
* clustering each relation in a separate transaction . For this to work ,
* we need to :
* - provide a separate memory context so that we can pass information in
* a way that trascends transactions
* - start a new transaction every time a new relation is clustered
* - check for validity of the information on to - be - clustered relations ,
* as someone might have deleted a relation behind our back , or
* clustered one on a different index
* - end the transaction
*
* The single relation code does not have any overhead .
*
* We also allow a relation being specified without index . In that case ,
* the indisclustered bit will be looked up , and an ERROR will be thrown
* if there is no index with the bit set .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
cluster ( ClusterStmt * stmt )
{
/* This is the single relation case. */
if ( stmt - > relation ! = NULL )
{
Oid indexOid = InvalidOid ,
tableOid ;
relToCluster rvtc ;
HeapTuple tuple ;
Form_pg_class classForm ;
tableOid = RangeVarGetRelid ( stmt - > relation , false ) ;
if ( ! check_cluster_ownership ( tableOid ) )
elog ( ERROR , " CLUSTER: You do not own relation %s " ,
stmt - > relation - > relname ) ;
tuple = SearchSysCache ( RELOID ,
ObjectIdGetDatum ( tableOid ) ,
0 , 0 , 0 ) ;
if ( ! HeapTupleIsValid ( tuple ) )
elog ( ERROR , " Cache lookup failed for relation %u " , tableOid ) ;
classForm = ( Form_pg_class ) GETSTRUCT ( tuple ) ;
if ( stmt - > indexname = = NULL )
{
List * index ;
Relation rel = RelationIdGetRelation ( tableOid ) ;
HeapTuple ituple = NULL ,
idxtuple = NULL ;
/* We need to fetch the index that has indisclustered set. */
foreach ( index , RelationGetIndexList ( rel ) )
{
Form_pg_index indexForm ;
indexOid = lfirsti ( index ) ;
ituple = SearchSysCache ( RELOID ,
ObjectIdGetDatum ( indexOid ) ,
0 , 0 , 0 ) ;
if ( ! HeapTupleIsValid ( ituple ) )
elog ( ERROR , " Cache lookup failed for relation %u " , indexOid ) ;
idxtuple = SearchSysCache ( INDEXRELID ,
ObjectIdGetDatum ( HeapTupleGetOid ( ituple ) ) ,
0 , 0 , 0 ) ;
if ( ! HeapTupleIsValid ( idxtuple ) )
elog ( ERROR , " Cache lookup failed for index %u " , HeapTupleGetOid ( ituple ) ) ;
indexForm = ( Form_pg_index ) GETSTRUCT ( idxtuple ) ;
if ( indexForm - > indisclustered )
break ;
indexOid = InvalidOid ;
}
if ( indexOid = = InvalidOid )
elog ( ERROR , " CLUSTER: No previously clustered index found on table %s " ,
stmt - > relation - > relname ) ;
RelationClose ( rel ) ;
ReleaseSysCache ( ituple ) ;
ReleaseSysCache ( idxtuple ) ;
}
else
{
/* The index is expected to be in the same namespace as the relation. */
indexOid = get_relname_relid ( stmt - > indexname , classForm - > relnamespace ) ;
}
ReleaseSysCache ( tuple ) ;
/* XXX Maybe the namespace should be reported as well */
if ( ! OidIsValid ( indexOid ) )
elog ( ERROR , " CLUSTER: cannot find index \" %s \" for table \" %s \" " ,
stmt - > indexname , stmt - > relation - > relname ) ;
rvtc . tableOid = tableOid ;
rvtc . indexOid = indexOid ;
rvtc . isPrevious = false ;
/* Do the job */
cluster_rel ( & rvtc ) ;
}
else
{
/*
* This is the " no relation " case . We need to cluster all tables
* that have some index with indisclustered set .
*/
relToCluster * rvtc ;
List * rv ,
* rvs ;
/*
* We cannot run CLUSTER ALL inside a user transaction block ; if we were inside
* a transaction , then our commit - and start - transaction - command calls
* would not have the intended effect !
*/
if ( IsTransactionBlock ( ) )
elog ( ERROR , " CLUSTER cannot run inside a BEGIN/END block " ) ;
/* Running CLUSTER from a function would free the function context */
if ( ! MemoryContextContains ( QueryContext , stmt ) )
elog ( ERROR , " CLUSTER cannot be called from a function " ) ;
/*
* Create special memory context for cross - transaction storage .
*
* Since it is a child of QueryContext , it will go away even in case
* of error .
*/
cluster_context = AllocSetContextCreate ( QueryContext ,
" Cluster " ,
ALLOCSET_DEFAULT_MINSIZE ,
ALLOCSET_DEFAULT_INITSIZE ,
ALLOCSET_DEFAULT_MAXSIZE ) ;
/*
* Build the list of relations to cluster . Note that this lives in
* cluster_context .
*/
rvs = get_tables_to_cluster ( GetUserId ( ) ) ;
/* Ok, now that we've got them all, cluster them one by one */
foreach ( rv , rvs )
{
rvtc = ( relToCluster * ) lfirst ( rv ) ;
/* Start a new transaction for this relation. */
StartTransactionCommand ( true ) ;
cluster_rel ( rvtc ) ;
CommitTransactionCommand ( true ) ;
}
}
/* Start a new transaction for the cleanup work. */
StartTransactionCommand ( true ) ;
/* Clean up working storage */
if ( stmt - > relation = = NULL )
{
MemoryContextDelete ( cluster_context ) ;
cluster_context = NULL ;
}
}
/* Checks if the user owns the relation. Superusers
* are allowed to cluster any table .
*/
bool
check_cluster_ownership ( Oid relOid )
{
/* Superusers bypass this check */
return pg_class_ownercheck ( relOid , GetUserId ( ) ) ;
}
/* Get a list of tables that the current user owns and
* have indisclustered set . Return the list in a List * of rvsToCluster
* with the tableOid and the indexOid on which the table is already
* clustered .
*/
List *
get_tables_to_cluster ( Oid owner )
{
Relation indRelation ;
HeapScanDesc scan ;
ScanKeyData entry ;
HeapTuple indexTuple ;
Form_pg_index index ;
relToCluster * rvtc ;
List * rvs = NIL ;
/*
* Get all indexes that have indisclustered set . System
* relations or nailed - in relations cannot ever have
* indisclustered set , because CLUSTER will refuse to
* set it when called with one of them as argument .
*/
indRelation = relation_openr ( IndexRelationName , RowExclusiveLock ) ;
ScanKeyEntryInitialize ( & entry , 0 , Anum_pg_index_indisclustered ,
F_BOOLEQ , true ) ;
scan = heap_beginscan ( indRelation , SnapshotNow , 1 , & entry ) ;
while ( ( indexTuple = heap_getnext ( scan , ForwardScanDirection ) ) ! = NULL )
{
MemoryContext old_context = NULL ;
index = ( Form_pg_index ) GETSTRUCT ( indexTuple ) ;
if ( ! check_cluster_ownership ( index - > indrelid ) )
continue ;
/*
* We have to build the struct in a different memory context so
* it will survive the cross - transaction processing
*/
old_context = MemoryContextSwitchTo ( cluster_context ) ;
rvtc = ( relToCluster * ) palloc ( sizeof ( relToCluster ) ) ;
rvtc - > indexOid = index - > indexrelid ;
rvtc - > tableOid = index - > indrelid ;
rvtc - > isPrevious = true ;
rvs = lcons ( ( void * ) rvtc , rvs ) ;
MemoryContextSwitchTo ( old_context ) ;
}
heap_endscan ( scan ) ;
/*
* Release the lock on pg_index . We will check the indexes
* later again .
*
*/
relation_close ( indRelation , RowExclusiveLock ) ;
return rvs ;
}