@ -20,17 +20,24 @@
# include "catalog/catalog.h"
# include "catalog/indexing.h"
# include "catalog/namespace.h"
# include "catalog/pg_operator.h"
# include "commands/cluster.h"
# include "commands/matview.h"
# include "commands/tablecmds.h"
# include "commands/tablespace.h"
# include "executor/executor.h"
# include "executor/spi.h"
# include "miscadmin.h"
# include "parser/parse_relation.h"
# include "rewrite/rewriteHandler.h"
# include "storage/smgr.h"
# include "tcop/tcopprot.h"
# include "utils/builtins.h"
# include "utils/lsyscache.h"
# include "utils/rel.h"
# include "utils/snapmgr.h"
# include "utils/syscache.h"
# include "utils/typcache.h"
typedef struct
@ -44,12 +51,23 @@ typedef struct
BulkInsertState bistate ; /* bulk insert state */
} DR_transientrel ;
static int matview_maintenance_depth = 0 ;
static void transientrel_startup ( DestReceiver * self , int operation , TupleDesc typeinfo ) ;
static void transientrel_receive ( TupleTableSlot * slot , DestReceiver * self ) ;
static void transientrel_shutdown ( DestReceiver * self ) ;
static void transientrel_destroy ( DestReceiver * self ) ;
static void refresh_matview_datafill ( DestReceiver * dest , Query * query ,
const char * queryString ) ;
const char * queryString , Oid relowner ) ;
static char * make_temptable_name_n ( char * tempname , int n ) ;
static void mv_GenerateOper ( StringInfo buf , Oid opoid ) ;
static void refresh_by_match_merge ( Oid matviewOid , Oid tempOid ) ;
static void refresh_by_heap_swap ( Oid matviewOid , Oid OIDNewHeap ) ;
static void OpenMatViewIncrementalMaintenance ( void ) ;
static void CloseMatViewIncrementalMaintenance ( void ) ;
/*
* SetMatViewPopulatedState
@ -122,18 +140,21 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
RewriteRule * rule ;
List * actions ;
Query * dataQuery ;
Oid save_userid ;
int save_sec_context ;
int save_nestlevel ;
Oid tableSpace ;
Oid OIDNewHeap ;
DestReceiver * dest ;
bool concurrent ;
LOCKMODE lockmode ;
/* Determine strength of lock needed. */
concurrent = stmt - > concurrent ;
lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock ;
/*
* Get a lock until end of transaction .
*/
matviewOid = RangeVarGetRelidExtended ( stmt - > relation ,
AccessExclusiveLock , false , false ,
lockmode , false , false ,
RangeVarCallbackOwnsTable , NULL ) ;
matviewRel = heap_open ( matviewOid , NoLock ) ;
@ -144,11 +165,22 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
errmsg ( " \" %s \" is not a materialized view " ,
RelationGetRelationName ( matviewRel ) ) ) ) ;
/*
* We ' re not using materialized views in the system catalogs .
*/
/* Check that CONCURRENTLY is not specified if not populated. */
if ( concurrent & & ! RelationIsPopulated ( matviewRel ) )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " CONCURRENTLY cannot be used when the materialized view is not populated " ) ) ) ;
/* Check that conflicting options have not been specified. */
if ( concurrent & & stmt - > skipData )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " CONCURRENTLY and WITH NO DATA options cannot be used together " ) ) ) ;
/* We're not using materialized views in the system catalogs. */
Assert ( ! IsSystemRelation ( matviewRel ) ) ;
/* We don't allow an oid column for a materialized view. */
Assert ( ! matviewRel - > rd_rel - > relhasoids ) ;
/*
@ -194,48 +226,49 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
*/
CheckTableNotInUse ( matviewRel , " REFRESH MATERIALIZED VIEW " ) ;
/*
* Switch to the owner ' s userid , so that any functions are run as that
* user . Also lock down security - restricted operations and arrange to
* make GUC variable changes local to this command .
*/
GetUserIdAndSecContext ( & save_userid , & save_sec_context ) ;
SetUserIdAndSecContext ( matviewRel - > rd_rel - > relowner ,
save_sec_context | SECURITY_RESTRICTED_OPERATION ) ;
save_nestlevel = NewGUCNestLevel ( ) ;
/*
* Tentatively mark the matview as populated or not ( this will roll back
* if we fail later ) .
*/
SetMatViewPopulatedState ( matviewRel , ! stmt - > skipData ) ;
tableSpace = matviewRel - > rd_rel - > reltablespace ;
/* Concurrent refresh builds new data in temp tablespace, and does diff. */
if ( concurrent )
tableSpace = GetDefaultTablespace ( RELPERSISTENCE_TEMP ) ;
else
tableSpace = matviewRel - > rd_rel - > reltablespace ;
heap_close ( matviewRel , NoLock ) ;
/* Create the transient table that will receive the regenerated data. */
OIDNewHeap = make_new_heap ( matviewOid , tableSpace ) ;
OIDNewHeap = make_new_heap ( matviewOid , tableSpace , concurrent ,
ExclusiveLock ) ;
dest = CreateTransientRelDestReceiver ( OIDNewHeap ) ;
/* Generate the data, if wanted. */
if ( ! stmt - > skipData )
refresh_matview_datafill ( dest , dataQuery , queryString ) ;
/*
* Swap the physical files of the target and transient tables , then
* rebuild the target ' s indexes and throw away the transient table .
*/
finish_heap_swap ( matviewOid , OIDNewHeap , false , false , true , true ,
RecentXmin , ReadNextMultiXactId ( ) ) ;
RelationCacheInvalidateEntry ( matviewOid ) ;
/* Roll back any GUC changes */
AtEOXact_GUC ( false , save_nestlevel ) ;
/* Restore userid and security context */
SetUserIdAndSecContext ( save_userid , save_sec_context ) ;
refresh_matview_datafill ( dest , dataQuery , queryString ,
matviewRel - > rd_rel - > relowner ) ;
/* Make the matview match the newly generated data. */
if ( concurrent )
{
int old_depth = matview_maintenance_depth ;
PG_TRY ( ) ;
{
refresh_by_match_merge ( matviewOid , OIDNewHeap ) ;
}
PG_CATCH ( ) ;
{
matview_maintenance_depth = old_depth ;
PG_RE_THROW ( ) ;
}
PG_END_TRY ( ) ;
Assert ( matview_maintenance_depth = = old_depth ) ;
}
else
refresh_by_heap_swap ( matviewOid , OIDNewHeap ) ;
}
/*
@ -243,11 +276,24 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
*/
static void
refresh_matview_datafill ( DestReceiver * dest , Query * query ,
const char * queryString )
const char * queryString , Oid relowner )
{
List * rewritten ;
PlannedStmt * plan ;
QueryDesc * queryDesc ;
Oid save_userid ;
int save_sec_context ;
int save_nestlevel ;
/*
* Switch to the owner ' s userid , so that any functions are run as that
* user . Also lock down security - restricted operations and arrange to
* make GUC variable changes local to this command .
*/
GetUserIdAndSecContext ( & save_userid , & save_sec_context ) ;
SetUserIdAndSecContext ( relowner ,
save_sec_context | SECURITY_RESTRICTED_OPERATION ) ;
save_nestlevel = NewGUCNestLevel ( ) ;
/* Rewrite, copying the given Query to make sure it's not changed */
rewritten = QueryRewrite ( ( Query * ) copyObject ( query ) ) ;
@ -290,6 +336,12 @@ refresh_matview_datafill(DestReceiver *dest, Query *query,
FreeQueryDesc ( queryDesc ) ;
PopActiveSnapshot ( ) ;
/* Roll back any GUC changes */
AtEOXact_GUC ( false , save_nestlevel ) ;
/* Restore userid and security context */
SetUserIdAndSecContext ( save_userid , save_sec_context ) ;
}
DestReceiver *
@ -388,3 +440,401 @@ transientrel_destroy(DestReceiver *self)
{
pfree ( self ) ;
}
/*
* Given a qualified temporary table name , append an underscore followed by
* the given integer , to make a new table name based on the old one .
*
* This leaks memory through palloc ( ) , which won ' t be cleaned up until the
* current memory memory context is freed .
*/
static char *
make_temptable_name_n ( char * tempname , int n )
{
StringInfoData namebuf ;
initStringInfo ( & namebuf ) ;
appendStringInfoString ( & namebuf , tempname ) ;
appendStringInfo ( & namebuf , " _%i " , n ) ;
return namebuf . data ;
}
static void
mv_GenerateOper ( StringInfo buf , Oid opoid )
{
HeapTuple opertup ;
Form_pg_operator operform ;
opertup = SearchSysCache1 ( OPEROID , ObjectIdGetDatum ( opoid ) ) ;
if ( ! HeapTupleIsValid ( opertup ) )
elog ( ERROR , " cache lookup failed for operator %u " , opoid ) ;
operform = ( Form_pg_operator ) GETSTRUCT ( opertup ) ;
Assert ( operform - > oprkind = = ' b ' ) ;
appendStringInfo ( buf , " OPERATOR(%s.%s) " ,
quote_identifier ( get_namespace_name ( operform - > oprnamespace ) ) ,
NameStr ( operform - > oprname ) ) ;
ReleaseSysCache ( opertup ) ;
}
/*
* refresh_by_match_merge
*
* Refresh a materialized view with transactional semantics , while allowing
* concurrent reads .
*
* This is called after a new version of the data has been created in a
* temporary table . It performs a full outer join against the old version of
* the data , producing " diff " results . This join cannot work if there are any
* duplicated rows in either the old or new versions , in the sense that every
* column would compare as equal between the two rows . It does work correctly
* in the face of rows which have at least one NULL value , with all non - NULL
* columns equal . The behavior of NULLs on equality tests and on UNIQUE
* indexes turns out to be quite convenient here ; the tests we need to make
* are consistent with default behavior . If there is at least one UNIQUE
* index on the materialized view , we have exactly the guarantee we need . By
* joining based on equality on all columns which are part of any unique
* index , we identify the rows on which we can use UPDATE without any problem .
* If any column is NULL in either the old or new version of a row ( or both ) ,
* we must use DELETE and INSERT , since there could be multiple rows which are
* NOT DISTINCT FROM each other , and we could otherwise end up with the wrong
* number of occurrences in the updated relation . The temporary table used to
* hold the diff results contains just the TID of the old record ( if matched )
* and the ROW from the new table as a single column of complex record type
* ( if matched ) .
*
* Once we have the diff table , we perform set - based DELETE , UPDATE , and
* INSERT operations against the materialized view , and discard both temporary
* tables .
*
* Everything from the generation of the new data to applying the differences
* takes place under cover of an ExclusiveLock , since it seems as though we
* would want to prohibit not only concurrent REFRESH operations , but also
* incremental maintenance . It also doesn ' t seem reasonable or safe to allow
* SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
* this command .
*/
static void
refresh_by_match_merge ( Oid matviewOid , Oid tempOid )
{
StringInfoData querybuf ;
Relation matviewRel ;
Relation tempRel ;
char * matviewname ;
char * tempname ;
char * diffname ;
TupleDesc tupdesc ;
bool foundUniqueIndex ;
List * indexoidlist ;
ListCell * indexoidscan ;
int16 relnatts ;
bool * usedForQual ;
Oid save_userid ;
int save_sec_context ;
int save_nestlevel ;
initStringInfo ( & querybuf ) ;
matviewRel = heap_open ( matviewOid , NoLock ) ;
matviewname = quote_qualified_identifier ( get_namespace_name ( RelationGetNamespace ( matviewRel ) ) ,
RelationGetRelationName ( matviewRel ) ) ;
tempRel = heap_open ( tempOid , NoLock ) ;
tempname = quote_qualified_identifier ( get_namespace_name ( RelationGetNamespace ( tempRel ) ) ,
RelationGetRelationName ( tempRel ) ) ;
diffname = make_temptable_name_n ( tempname , 2 ) ;
relnatts = matviewRel - > rd_rel - > relnatts ;
usedForQual = ( bool * ) palloc0 ( sizeof ( bool ) * relnatts ) ;
/* Open SPI context. */
if ( SPI_connect ( ) ! = SPI_OK_CONNECT )
elog ( ERROR , " SPI_connect failed " ) ;
/* Analyze the temp table with the new contents. */
appendStringInfo ( & querybuf , " ANALYZE %s " , tempname ) ;
if ( SPI_exec ( querybuf . data , 0 ) ! = SPI_OK_UTILITY )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
/*
* We need to ensure that there are not duplicate rows without NULLs in
* the new data set before we can count on the " diff " results . Check for
* that in a way that allows showing the first duplicated row found . Even
* after we pass this test , a unique index on the materialized view may
* find a duplicate key problem .
*/
resetStringInfo ( & querybuf ) ;
appendStringInfo ( & querybuf ,
" SELECT x FROM %s x WHERE x IS NOT NULL AND EXISTS "
" (SELECT * FROM %s y WHERE y IS NOT NULL "
" AND (y.*) = (x.*) AND y.ctid <> x.ctid) LIMIT 1 " ,
tempname , tempname ) ;
if ( SPI_execute ( querybuf . data , false , 1 ) ! = SPI_OK_SELECT )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
if ( SPI_processed > 0 )
{
ereport ( ERROR ,
( errcode ( ERRCODE_CARDINALITY_VIOLATION ) ,
errmsg ( " new data for \" %s \" contains duplicate rows without any NULL columns " ,
RelationGetRelationName ( matviewRel ) ) ,
errdetail ( " Row: %s " ,
SPI_getvalue ( SPI_tuptable - > vals [ 0 ] , SPI_tuptable - > tupdesc , 1 ) ) ) ) ;
}
/* Start building the query for creating the diff table. */
resetStringInfo ( & querybuf ) ;
appendStringInfo ( & querybuf ,
" CREATE TEMP TABLE %s AS "
" SELECT x.ctid AS tid, y FROM %s x FULL JOIN %s y ON ( " ,
diffname , matviewname , tempname ) ;
/*
* Get the list of index OIDs for the table from the relcache , and look up
* each one in the pg_index syscache . We will test for equality on all
* columns present in all unique indexes which only reference columns and
* include all rows .
*/
tupdesc = matviewRel - > rd_att ;
foundUniqueIndex = false ;
indexoidlist = RelationGetIndexList ( matviewRel ) ;
foreach ( indexoidscan , indexoidlist )
{
Oid indexoid = lfirst_oid ( indexoidscan ) ;
HeapTuple indexTuple ;
Form_pg_index index ;
indexTuple = SearchSysCache1 ( INDEXRELID , ObjectIdGetDatum ( indexoid ) ) ;
if ( ! HeapTupleIsValid ( indexTuple ) ) /* should not happen */
elog ( ERROR , " cache lookup failed for index %u " , indexoid ) ;
index = ( Form_pg_index ) GETSTRUCT ( indexTuple ) ;
/* We're only interested if it is unique and valid. */
if ( index - > indisunique & & IndexIsValid ( index ) )
{
int numatts = index - > indnatts ;
int i ;
bool expr = false ;
Relation indexRel ;
/* Skip any index on an expression. */
for ( i = 0 ; i < numatts ; i + + )
{
if ( index - > indkey . values [ i ] = = 0 )
{
expr = true ;
break ;
}
}
if ( expr )
{
ReleaseSysCache ( indexTuple ) ;
continue ;
}
/* Skip partial indexes. */
indexRel = index_open ( index - > indexrelid , RowExclusiveLock ) ;
if ( indexRel - > rd_indpred ! = NIL )
{
index_close ( indexRel , NoLock ) ;
ReleaseSysCache ( indexTuple ) ;
continue ;
}
/* Hold the locks, since we're about to run DML which needs them. */
index_close ( indexRel , NoLock ) ;
/* Add quals for all columns from this index. */
for ( i = 0 ; i < numatts ; i + + )
{
int attnum = index - > indkey . values [ i ] ;
Oid type ;
Oid op ;
const char * colname ;
/*
* Only include the column once regardless of how many times
* it shows up in how many indexes .
*
* This is also useful later to omit columns which can not
* have changed from the SET clause of the UPDATE statement .
*/
if ( usedForQual [ attnum - 1 ] )
continue ;
usedForQual [ attnum - 1 ] = true ;
/*
* Actually add the qual , ANDed with any others .
*/
if ( foundUniqueIndex )
appendStringInfoString ( & querybuf , " AND " ) ;
colname = quote_identifier ( NameStr ( ( tupdesc - > attrs [ attnum - 1 ] ) - > attname ) ) ;
appendStringInfo ( & querybuf , " y.%s " , colname ) ;
type = attnumTypeId ( matviewRel , attnum ) ;
op = lookup_type_cache ( type , TYPECACHE_EQ_OPR ) - > eq_opr ;
mv_GenerateOper ( & querybuf , op ) ;
appendStringInfo ( & querybuf , " x.%s " , colname ) ;
foundUniqueIndex = true ;
}
}
ReleaseSysCache ( indexTuple ) ;
}
list_free ( indexoidlist ) ;
if ( ! foundUniqueIndex )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " cannot refresh materialized view \" %s \" concurrently " ,
matviewname ) ,
errhint ( " Create a UNIQUE index with no WHERE clause on one or more columns of the materialized view. " ) ) ) ;
appendStringInfoString ( & querybuf ,
" AND y = x) WHERE (y.*) IS DISTINCT FROM (x.*) "
" ORDER BY tid " ) ;
/* Create the temporary "diff" table. */
if ( SPI_exec ( querybuf . data , 0 ) ! = SPI_OK_UTILITY )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
/*
* We have no further use for data from the " full-data " temp table , but we
* must keep it around because its type is reference from the diff table .
*/
/* Analyze the diff table. */
resetStringInfo ( & querybuf ) ;
appendStringInfo ( & querybuf , " ANALYZE %s " , diffname ) ;
if ( SPI_exec ( querybuf . data , 0 ) ! = SPI_OK_UTILITY )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
OpenMatViewIncrementalMaintenance ( ) ;
/*
* Switch to the owner ' s userid , so that any functions are run as that
* user . Also lock down security - restricted operations and arrange to
* make GUC variable changes local to this command .
*/
GetUserIdAndSecContext ( & save_userid , & save_sec_context ) ;
SetUserIdAndSecContext ( matviewRel - > rd_rel - > relowner ,
save_sec_context | SECURITY_RESTRICTED_OPERATION ) ;
save_nestlevel = NewGUCNestLevel ( ) ;
/* Deletes must come before inserts; do them first. */
resetStringInfo ( & querybuf ) ;
appendStringInfo ( & querybuf ,
" DELETE FROM %s WHERE ctid IN "
" (SELECT d.tid FROM %s d "
" WHERE d.tid IS NOT NULL "
" AND (d.y) IS NOT DISTINCT FROM NULL) " ,
matviewname , diffname ) ;
if ( SPI_exec ( querybuf . data , 0 ) ! = SPI_OK_DELETE )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
/* Updates before inserts gives a better chance at HOT updates. */
resetStringInfo ( & querybuf ) ;
appendStringInfo ( & querybuf , " UPDATE %s x SET " , matviewname ) ;
{
int i ;
bool targetColFound = false ;
for ( i = 0 ; i < tupdesc - > natts ; i + + )
{
const char * colname ;
if ( tupdesc - > attrs [ i ] - > attisdropped )
continue ;
if ( usedForQual [ i ] )
continue ;
if ( targetColFound )
appendStringInfoString ( & querybuf , " , " ) ;
targetColFound = true ;
colname = quote_identifier ( NameStr ( ( tupdesc - > attrs [ i ] ) - > attname ) ) ;
appendStringInfo ( & querybuf , " %s = (d.y).%s " , colname , colname ) ;
}
if ( targetColFound )
{
appendStringInfo ( & querybuf ,
" FROM %s d "
" WHERE d.tid IS NOT NULL AND x.ctid = d.tid " ,
diffname ) ;
if ( SPI_exec ( querybuf . data , 0 ) ! = SPI_OK_UPDATE )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
}
}
/* Inserts go last. */
resetStringInfo ( & querybuf ) ;
appendStringInfo ( & querybuf ,
" INSERT INTO %s SELECT (y).* FROM %s WHERE tid IS NULL " ,
matviewname , diffname ) ;
if ( SPI_exec ( querybuf . data , 0 ) ! = SPI_OK_INSERT )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
/* Roll back any GUC changes */
AtEOXact_GUC ( false , save_nestlevel ) ;
/* Restore userid and security context */
SetUserIdAndSecContext ( save_userid , save_sec_context ) ;
/* We're done maintaining the materialized view. */
CloseMatViewIncrementalMaintenance ( ) ;
heap_close ( tempRel , NoLock ) ;
heap_close ( matviewRel , NoLock ) ;
/* Clean up temp tables. */
resetStringInfo ( & querybuf ) ;
appendStringInfo ( & querybuf , " DROP TABLE %s, %s " , diffname , tempname ) ;
if ( SPI_exec ( querybuf . data , 0 ) ! = SPI_OK_UTILITY )
elog ( ERROR , " SPI_exec failed: %s " , querybuf . data ) ;
/* Close SPI context. */
if ( SPI_finish ( ) ! = SPI_OK_FINISH )
elog ( ERROR , " SPI_finish failed " ) ;
}
/*
* Swap the physical files of the target and transient tables , then rebuild
* the target ' s indexes and throw away the transient table . Security context
* swapping is handled by the called function , so it is not needed here .
*/
static void
refresh_by_heap_swap ( Oid matviewOid , Oid OIDNewHeap )
{
finish_heap_swap ( matviewOid , OIDNewHeap , false , false , true , true ,
RecentXmin , ReadNextMultiXactId ( ) ) ;
RelationCacheInvalidateEntry ( matviewOid ) ;
}
static void
OpenMatViewIncrementalMaintenance ( void )
{
matview_maintenance_depth + + ;
}
static void
CloseMatViewIncrementalMaintenance ( void )
{
matview_maintenance_depth - - ;
Assert ( matview_maintenance_depth > = 0 ) ;
}
/*
* This should be used to test whether the backend is in a context where it is
* OK to allow DML statements to modify materialized views . We only want to
* allow that for internal code driven by the materialized view definition ,
* not for arbitrary user - supplied code .
*/
bool
MatViewIncrementalMaintenanceIsEnabled ( void )
{
return matview_maintenance_depth > 0 ;
}