@ -16,9 +16,11 @@
# include "postgres.h"
# include "access/htup_details.h"
# include "access/relation.h"
# include "catalog/index.h"
# include "catalog/namespace.h"
# include "catalog/pg_class.h"
# include "catalog/pg_database.h"
# include "funcapi.h"
# include "miscadmin.h"
@ -29,6 +31,7 @@
# include "utils/builtins.h"
# include "utils/lsyscache.h"
# include "utils/rel.h"
# include "utils/syscache.h"
/*
* Ensure that a given argument is not null .
@ -119,53 +122,84 @@ stats_check_arg_pair(FunctionCallInfo fcinfo,
}
/*
* Lock relation in ShareUpdateExclusive mode , check privileges , and close the
* relation ( but retain the lock ) .
*
* A role has privileges to set statistics on the relation if any of the
* following are true :
* - the role owns the current database and the relation is not shared
* - the role has the MAINTAIN privilege on the relation
*/
void
stats_lock_check_privileges ( Oid reloid )
RangeVarCallbackForStats ( const RangeVar * relation ,
Oid relId , Oid oldRelId , void * arg )
{
Relation table ;
Oid table_oid = reloid ;
Oid index_oid = InvalidOid ;
LOCKMODE index_lockmode = NoLock ;
Oid * locked_oid = ( Oid * ) arg ;
Oid table_oid = relId ;
HeapTuple tuple ;
Form_pg_class form ;
char relkind ;
/*
* For indexes , we follow the locking behavior in do_analyze_rel ( ) and
* check_lock_if_inplace_updateable_rel ( ) , which is to lock the table
* first in ShareUpdateExclusive mode and then the index in AccessShare
* mode .
*
* Partitioned indexes are treated differently than normal indexes in
* check_lock_if_inplace_updateable_rel ( ) , so we take a
* ShareUpdateExclusive lock on both the partitioned table and the
* partitioned index .
* If we previously locked some other index ' s heap , and the name we ' re
* looking up no longer refers to that relation , release the now - useless
* lock .
*/
switch ( get_rel_relkind ( rel oid) )
if ( relId ! = oldRelId & & OidIsValid ( * locked_oid ) )
{
case RELKIND_INDEX :
index_oid = reloid ;
table_oid = IndexGetRelation ( index_oid , false ) ;
index_lockmode = AccessShareLock ;
break ;
case RELKIND_PARTITIONED_INDEX :
index_oid = reloid ;
table_oid = IndexGetRelation ( index_oid , false ) ;
index_lockmode = ShareUpdateExclusiveLock ;
break ;
default :
break ;
UnlockRelationOid ( * locked_oid , ShareUpdateExclusiveLock ) ;
* locked_oid = InvalidOid ;
}
/* If the relation does not exist, there's nothing more to do. */
if ( ! OidIsValid ( relId ) )
return ;
/* If the relation does exist, check whether it's an index. */
relkind = get_rel_relkind ( relId ) ;
if ( relkind = = RELKIND_INDEX | |
relkind = = RELKIND_PARTITIONED_INDEX )
table_oid = IndexGetRelation ( relId , false ) ;
/*
* If retrying yields the same OID , there are a couple of extremely
* unlikely scenarios we need to handle .
*/
if ( relId = = oldRelId )
{
/*
* If a previous lookup found an index , but the current lookup did
* not , the index was dropped and the OID was reused for something
* else between lookups . In theory , we could simply drop our lock on
* the index ' s parent table and proceed , but in the interest of
* avoiding complexity , we just error .
*/
if ( table_oid = = relId & & OidIsValid ( * locked_oid ) )
ereport ( ERROR ,
( errcode ( ERRCODE_UNDEFINED_OBJECT ) ,
errmsg ( " index \" %s \" was concurrently dropped " ,
relation - > relname ) ) ) ;
/*
* If the current lookup found an index but a previous lookup either
* did not find an index or found one with a different parent
* relation , the relation was dropped and the OID was reused for an
* index between lookups . RangeVarGetRelidExtended ( ) will have
* already locked the index at this point , so we can ' t just lock the
* newly discovered parent table OID without risking deadlock . As
* above , we just error in this case .
*/
if ( table_oid ! = relId & & table_oid ! = * locked_oid )
ereport ( ERROR ,
( errcode ( ERRCODE_UNDEFINED_OBJECT ) ,
errmsg ( " index \" %s \" was concurrently created " ,
relation - > relname ) ) ) ;
}
table = relation_open ( table_oid , ShareUpdateExclusiveLock ) ;
tuple = SearchSysCache1 ( RELOID , ObjectIdGetDatum ( table_oid ) ) ;
if ( ! HeapTupleIsValid ( tuple ) )
elog ( ERROR , " cache lookup failed for OID %u " , table_oid ) ;
form = ( Form_pg_class ) GETSTRUCT ( tuple ) ;
/* the relkinds that can be used with ANALYZE */
switch ( table - > rd_rel - > relkind )
switch ( form - > relkind )
{
case RELKIND_RELATION :
case RELKIND_MATVIEW :
@ -176,62 +210,36 @@ stats_lock_check_privileges(Oid reloid)
ereport ( ERROR ,
( errcode ( ERRCODE_WRONG_OBJECT_TYPE ) ,
errmsg ( " cannot modify statistics for relation \" %s \" " ,
RelationGetRelationName ( tabl e) ) ,
errdetail_relkind_not_supported ( table - > rd_rel - > relkind ) ) ) ;
NameStr ( form - > relnam e) ) ,
errdetail_relkind_not_supported ( form - > relkind ) ) ) ;
}
if ( OidIsValid ( index_oid ) )
{
Relation index ;
Assert ( index_lockmode ! = NoLock ) ;
index = relation_open ( index_oid , index_lockmode ) ;
Assert ( index - > rd_index & & index - > rd_index - > indrelid = = table_oid ) ;
/* retain lock on index */
relation_close ( index , NoLock ) ;
}
if ( table - > rd_rel - > relisshared )
if ( form - > relisshared )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " cannot modify statistics for shared relation " ) ) ) ;
/* Check permissions */
if ( ! object_ownercheck ( DatabaseRelationId , MyDatabaseId , GetUserId ( ) ) )
{
AclResult aclresult = pg_class_aclcheck ( RelationGetRelid ( table ) ,
AclResult aclresult = pg_class_aclcheck ( table_oid ,
GetUserId ( ) ,
ACL_MAINTAIN ) ;
if ( aclresult ! = ACLCHECK_OK )
aclcheck_error ( aclresult ,
get_relkind_objtype ( table - > rd_rel - > relkind ) ,
NameStr ( table - > rd_rel - > relname ) ) ;
get_relkind_objtype ( form - > relkind ) ,
NameStr ( form - > relname ) ) ;
}
/* retain lock on table */
relation_close ( table , NoLock ) ;
}
ReleaseSysCache ( tuple ) ;
/*
* Lookup relation oid from schema and relation name .
*/
Oid
stats_lookup_relid ( const char * nspname , const char * relname )
{
Oid nspoid ;
Oid reloid ;
nspoid = LookupExplicitNamespace ( nspname , false ) ;
reloid = get_relname_relid ( relname , nspoid ) ;
if ( ! OidIsValid ( reloid ) )
ereport ( ERROR ,
( errcode ( ERRCODE_UNDEFINED_TABLE ) ,
errmsg ( " relation \" %s.%s \" does not exist " ,
nspname , relname ) ) ) ;
return reloid ;
/* Lock heap before index to avoid deadlock. */
if ( relId ! = oldRelId & & table_oid ! = relId )
{
LockRelationOid ( table_oid , ShareUpdateExclusiveLock ) ;
* locked_oid = table_oid ;
}
}