@ -328,7 +328,8 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
LOCKMODE lockmode ) ;
static ObjectAddress ATExecAlterConstraint ( Relation rel , AlterTableCmd * cmd ,
bool recurse , bool recursing , LOCKMODE lockmode ) ;
static ObjectAddress ATExecValidateConstraint ( Relation rel , char * constrName ,
static ObjectAddress ATExecValidateConstraint ( List * * wqueue ,
Relation rel , char * constrName ,
bool recurse , bool recursing , LOCKMODE lockmode ) ;
static int transformColumnNameList ( Oid relId , List * colList ,
int16 * attnums , Oid * atttypids ) ;
@ -342,7 +343,6 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
static void checkFkeyPermissions ( Relation rel , int16 * attnums , int natts ) ;
static CoercionPathType findFkeyCast ( Oid targetTypeId , Oid sourceTypeId ,
Oid * funcid ) ;
static void validateCheckConstraint ( Relation rel , HeapTuple constrtup ) ;
static void validateForeignKeyConstraint ( char * conname ,
Relation rel , Relation pkrel ,
Oid pkindOid , Oid constraintOid ) ;
@ -4500,13 +4500,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
address = ATExecAlterConstraint ( rel , cmd , false , false , lockmode ) ;
break ;
case AT_ValidateConstraint : /* VALIDATE CONSTRAINT */
address = ATExecValidateConstraint ( rel , cmd - > name , fals e , false ,
lockmode ) ;
address = ATExecValidateConstraint ( wqueue , rel , cmd - > name , false ,
false , lockmode ) ;
break ;
case AT_ValidateConstraintRecurse : /* VALIDATE CONSTRAINT with
* recursion */
address = ATExecValidateConstraint ( rel , cmd - > name , true , fals e ,
lockmode ) ;
address = ATExecValidateConstraint ( wqueue , rel , cmd - > name , true ,
false , lockmode ) ;
break ;
case AT_DropConstraint : /* DROP CONSTRAINT */
ATExecDropConstraint ( rel , cmd - > name , cmd - > behavior ,
@ -9727,8 +9727,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
* was already validated , InvalidObjectAddress is returned .
*/
static ObjectAddress
ATExecValidateConstraint ( Relation rel , char * constrName , bool recurs e ,
bool recursing , LOCKMODE lockmode )
ATExecValidateConstraint ( List * * wqueue , Relation rel , char * constrName ,
bool recurse , bool recurs ing , LOCKMODE lockmode )
{
Relation conrel ;
SysScanDesc scan ;
@ -9774,27 +9774,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
if ( ! con - > convalidated )
{
AlteredTableInfo * tab ;
HeapTuple copyTuple ;
Form_pg_constraint copy_con ;
if ( con - > contype = = CONSTRAINT_FOREIGN )
{
Relation refrel ;
NewConstraint * newcon ;
Constraint * fkconstraint ;
/*
* Triggers are already in place on both tables , so a concurrent
* write that alters the result here is not possible . Normally we
* can run a query here to do the validation , which would only
* require AccessShareLock . In some cases , it is possible that we
* might need to fire triggers to perform the check , so we take a
* lock at RowShareLock level just in case .
*/
refrel = table_open ( con - > confrelid , RowShareLock ) ;
/* Queue validation for phase 3 */
fkconstraint = makeNode ( Constraint ) ;
/* for now this is all we need */
fkconstraint - > conname = constrName ;
validateForeignKeyConstraint ( constrName , rel , refrel ,
con - > conindid ,
con - > oid ) ;
table_close ( refrel , NoLock ) ;
newcon = ( NewConstraint * ) palloc0 ( sizeof ( NewConstraint ) ) ;
newcon - > name = constrName ;
newcon - > contype = CONSTR_FOREIGN ;
newcon - > refrelid = con - > confrelid ;
newcon - > refindid = con - > conindid ;
newcon - > conid = con - > oid ;
newcon - > qual = ( Node * ) fkconstraint ;
/* Find or create work queue entry for this table */
tab = ATGetQueueEntry ( wqueue , rel ) ;
tab - > constraints = lappend ( tab - > constraints , newcon ) ;
/*
* We disallow creating invalid foreign keys to or from
@ -9805,6 +9809,10 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
{
List * children = NIL ;
ListCell * child ;
NewConstraint * newcon ;
bool isnull ;
Datum val ;
char * conbin ;
/*
* If we ' re recursing , the parent has already done this , so skip
@ -9844,12 +9852,30 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
/* find_all_inheritors already got lock */
childrel = table_open ( childoid , NoLock ) ;
ATExecValidateConstraint ( childrel , constrName , false ,
ATExecValidateConstraint ( wqueue , childrel , constrName , false ,
true , lockmode ) ;
table_close ( childrel , NoLock ) ;
}
validateCheckConstraint ( rel , tuple ) ;
/* Queue validation for phase 3 */
newcon = ( NewConstraint * ) palloc0 ( sizeof ( NewConstraint ) ) ;
newcon - > name = constrName ;
newcon - > contype = CONSTR_CHECK ;
newcon - > refrelid = InvalidOid ;
newcon - > refindid = InvalidOid ;
newcon - > conid = con - > oid ;
val = SysCacheGetAttr ( CONSTROID , tuple ,
Anum_pg_constraint_conbin , & isnull ) ;
if ( isnull )
elog ( ERROR , " null conbin for constraint %u " , con - > oid ) ;
conbin = TextDatumGetCString ( val ) ;
newcon - > qual = ( Node * ) stringToNode ( conbin ) ;
/* Find or create work queue entry for this table */
tab = ATGetQueueEntry ( wqueue , rel ) ;
tab - > constraints = lappend ( tab - > constraints , newcon ) ;
/*
* Invalidate relcache so that others see the new validated
@ -10223,87 +10249,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
}
}
/*
* Scan the existing rows in a table to verify they meet a proposed
* CHECK constraint .
*
* The caller must have opened and locked the relation appropriately .
*/
static void
validateCheckConstraint ( Relation rel , HeapTuple constrtup )
{
EState * estate ;
Datum val ;
char * conbin ;
Expr * origexpr ;
ExprState * exprstate ;
TableScanDesc scan ;
ExprContext * econtext ;
MemoryContext oldcxt ;
TupleTableSlot * slot ;
Form_pg_constraint constrForm ;
bool isnull ;
Snapshot snapshot ;
/*
* VALIDATE CONSTRAINT is a no - op for foreign tables and partitioned
* tables .
*/
if ( rel - > rd_rel - > relkind = = RELKIND_FOREIGN_TABLE | |
rel - > rd_rel - > relkind = = RELKIND_PARTITIONED_TABLE )
return ;
constrForm = ( Form_pg_constraint ) GETSTRUCT ( constrtup ) ;
estate = CreateExecutorState ( ) ;
/*
* XXX this tuple doesn ' t really come from a syscache , but this doesn ' t
* matter to SysCacheGetAttr , because it only wants to be able to fetch
* the tupdesc
*/
val = SysCacheGetAttr ( CONSTROID , constrtup , Anum_pg_constraint_conbin ,
& isnull ) ;
if ( isnull )
elog ( ERROR , " null conbin for constraint %u " ,
constrForm - > oid ) ;
conbin = TextDatumGetCString ( val ) ;
origexpr = ( Expr * ) stringToNode ( conbin ) ;
exprstate = ExecPrepareExpr ( origexpr , estate ) ;
econtext = GetPerTupleExprContext ( estate ) ;
slot = table_slot_create ( rel , NULL ) ;
econtext - > ecxt_scantuple = slot ;
snapshot = RegisterSnapshot ( GetLatestSnapshot ( ) ) ;
scan = table_beginscan ( rel , snapshot , 0 , NULL ) ;
/*
* Switch to per - tuple memory context and reset it for each tuple
* produced , so we don ' t leak memory .
*/
oldcxt = MemoryContextSwitchTo ( GetPerTupleMemoryContext ( estate ) ) ;
while ( table_scan_getnextslot ( scan , ForwardScanDirection , slot ) )
{
if ( ! ExecCheck ( exprstate , econtext ) )
ereport ( ERROR ,
( errcode ( ERRCODE_CHECK_VIOLATION ) ,
errmsg ( " check constraint \" %s \" of relation \" %s \" is violated by some row " ,
NameStr ( constrForm - > conname ) ,
RelationGetRelationName ( rel ) ) ,
errtableconstraint ( rel , NameStr ( constrForm - > conname ) ) ) ) ;
ResetExprContext ( econtext ) ;
}
MemoryContextSwitchTo ( oldcxt ) ;
table_endscan ( scan ) ;
UnregisterSnapshot ( snapshot ) ;
ExecDropSingleTupleTableSlot ( slot ) ;
FreeExecutorState ( estate ) ;
}
/*
* Scan the existing rows in a table to verify they meet a proposed FK
* constraint .