@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $ Header : / cvsroot / pgsql / src / backend / commands / portalcmds . c , v 1.10 2003 / 03 / 11 19 : 40 : 22 tgl Exp $
* $ Header : / cvsroot / pgsql / src / backend / commands / portalcmds . c , v 1.11 2003 / 03 / 27 16 : 51 : 27 momjian Exp $
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -17,18 +17,23 @@
# include <limits.h>
# include "miscadmin.h"
# include "commands/portalcmds.h"
# include "executor/executor.h"
# include "optimizer/planner.h"
# include "rewrite/rewriteHandler.h"
# include "utils/memutils.h"
static long DoRelativeFetch ( Portal portal ,
bool forward ,
long count ,
CommandDest dest ) ;
static long DoRelativeStoreFetch ( Portal portal ,
bool forward ,
long count ,
CommandDest dest ) ;
static void DoPortalRewind ( Portal portal ) ;
static Portal PreparePortal ( char * portalName ) ;
static Portal PreparePortal ( DeclareCursorStmt * stmt ) ;
/*
@ -46,8 +51,15 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest)
char * cursorName ;
QueryDesc * queryDesc ;
/* Check for invalid context (must be in transaction block) */
RequireTransactionChain ( ( void * ) stmt , " DECLARE CURSOR " ) ;
/*
* If this is a non - holdable cursor , we ensure that this statement
* has been executed inside a transaction block ( or else , it would
* have no user - visible effect ) .
*
* XXX : surely there is a better way to check this ?
*/
if ( ! ( stmt - > options & CURSOR_OPT_HOLD ) )
RequireTransactionChain ( ( void * ) stmt , " DECLARE CURSOR " ) ;
/*
* The query has been through parse analysis , but not rewriting or
@ -76,7 +88,7 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest)
/*
* Create a portal and copy the query and plan into its memory context .
*/
portal = PreparePortal ( stmt - > portalname ) ;
portal = PreparePortal ( stmt ) ;
oldContext = MemoryContextSwitchTo ( PortalGetHeapMemory ( portal ) ) ;
query = copyObject ( query ) ;
@ -130,6 +142,7 @@ PerformPortalFetch(FetchStmt *stmt,
portal = GetPortalByName ( stmt - > portalname ) ;
if ( ! PortalIsValid ( portal ) )
{
/* FIXME: shouldn't this be an ERROR? */
elog ( WARNING , " PerformPortalFetch: portal \" %s \" not found " ,
stmt - > portalname ) ;
return ;
@ -343,6 +356,9 @@ DoRelativeFetch(Portal portal,
ScanDirection direction ;
QueryDesc temp_queryDesc ;
if ( portal - > holdStore )
return DoRelativeStoreFetch ( portal , forward , count , dest ) ;
queryDesc = PortalGetQueryDesc ( portal ) ;
estate = queryDesc - > estate ;
@ -407,7 +423,7 @@ DoRelativeFetch(Portal portal,
}
else
{
if ( ! portal - > backwardOK )
if ( portal - > scrollType = = DISABLE_SCROLL )
elog ( ERROR , " Cursor can only scan forward "
" \n \t Declare it with SCROLL option to enable backward scan " ) ;
@ -452,17 +468,85 @@ DoRelativeFetch(Portal portal,
return estate - > es_processed ;
}
/*
* DoRelativeStoreFetch
* Do fetch for a simple N - rows - forward - or - backward case , getting
* the results from the portal ' s tuple store .
*/
static long
DoRelativeStoreFetch ( Portal portal ,
bool forward ,
long count ,
CommandDest dest )
{
DestReceiver * destfunc ;
QueryDesc * queryDesc = portal - > queryDesc ;
long rows_fetched = 0 ;
if ( ! forward & & portal - > scrollType = = DISABLE_SCROLL )
elog ( ERROR , " Cursor can only scan forward "
" \n \t Declare it with SCROLL option to enable backward scan " ) ;
destfunc = DestToFunction ( dest ) ;
( * destfunc - > setup ) ( destfunc , queryDesc - > operation ,
portal - > name , queryDesc - > tupDesc ) ;
for ( ; ; )
{
HeapTuple tup ;
bool should_free ;
if ( rows_fetched > = count )
break ;
if ( portal - > atEnd & & forward )
break ;
if ( portal - > atStart & & ! forward )
break ;
tup = tuplestore_getheaptuple ( portal - > holdStore , forward , & should_free ) ;
if ( tup = = NULL )
{
if ( forward )
portal - > atEnd = true ;
else
portal - > atStart = true ;
break ;
}
( * destfunc - > receiveTuple ) ( tup , queryDesc - > tupDesc , destfunc ) ;
rows_fetched + + ;
if ( forward )
portal - > portalPos + + ;
else
portal - > portalPos - - ;
if ( forward & & portal - > atStart )
portal - > atStart = false ;
if ( ! forward & & portal - > atEnd )
portal - > atEnd = false ;
if ( should_free )
pfree ( tup ) ;
}
( * destfunc - > cleanup ) ( destfunc ) ;
return rows_fetched ;
}
/*
* DoPortalRewind - rewind a Portal to starting point
*/
static void
DoPortalRewind ( Portal portal )
{
QueryDesc * queryDesc ;
queryDesc = PortalGetQueryDesc ( portal ) ;
ExecutorRewind ( queryDesc ) ;
if ( portal - > holdStore )
tuplestore_rescan ( portal - > holdStore ) ;
else
ExecutorRewind ( PortalGetQueryDesc ( portal ) ) ;
portal - > atStart = true ;
portal - > atEnd = false ;
@ -493,22 +577,25 @@ PerformPortalClose(char *name)
/*
* Note : PortalCleanup is called as a side - effect
*/
PortalDrop ( portal ) ;
PortalDrop ( portal , false ) ;
}
/*
* PreparePortal
* Given a DECLARE CURSOR statement , returns the Portal data
* structure based on that statement that is used to manage the
* Portal internally . If a portal with specified name already
* exists , it is replaced .
*/
static Portal
PreparePortal ( char * portalName )
PreparePortal ( DeclareCursorStmt * stmt )
{
Portal portal ;
/*
* Check for already - in - use portal name .
*/
portal = GetPortalByName ( portalN ame) ;
portal = GetPortalByName ( stmt - > portaln ame) ;
if ( PortalIsValid ( portal ) )
{
/*
@ -516,19 +603,30 @@ PreparePortal(char *portalName)
* portal ?
*/
elog ( WARNING , " Closing pre-existing portal \" %s \" " ,
portalN ame) ;
PortalDrop ( portal ) ;
stmt - > portaln ame) ;
PortalDrop ( portal , false ) ;
}
/*
* Create the new portal .
*/
portal = CreatePortal ( portalName ) ;
portal = CreatePortal ( stmt - > portalname ) ;
/*
* Modify the newly created portal based on the options specified in
* the DECLARE CURSOR statement .
*/
if ( stmt - > options & CURSOR_OPT_SCROLL )
portal - > scrollType = ENABLE_SCROLL ;
else if ( stmt - > options & CURSOR_OPT_NO_SCROLL )
portal - > scrollType = DISABLE_SCROLL ;
if ( stmt - > options & CURSOR_OPT_HOLD )
portal - > holdOpen = true ;
return portal ;
}
/*
* PortalCleanup
*
@ -545,14 +643,128 @@ PortalCleanup(Portal portal)
AssertArg ( PortalIsValid ( portal ) ) ;
AssertArg ( portal - > cleanup = = PortalCleanup ) ;
if ( portal - > holdStore )
tuplestore_end ( portal - > holdStore ) ;
else
ExecutorEnd ( PortalGetQueryDesc ( portal ) ) ;
}
/*
* PersistHoldablePortal
*
* Prepare the specified Portal for access outside of the current
* transaction . When this function returns , all future accesses to the
* portal must be done via the Tuplestore ( not by invoking the
* executor ) .
*/
void
PersistHoldablePortal ( Portal portal )
{
MemoryContext oldcxt ;
QueryDesc * queryDesc = PortalGetQueryDesc ( portal ) ;
/*
* If we ' re preserving a holdable portal , we had better be
* inside the transaction that originally created it .
*/
Assert ( portal - > createXact = = GetCurrentTransactionId ( ) ) ;
Assert ( portal - > holdStore = = NULL ) ;
/*
* This context is used to store portal data that needs to persist
* between transactions .
*/
oldcxt = MemoryContextSwitchTo ( portal - > holdContext ) ;
/* XXX: Should SortMem be used for this? */
portal - > holdStore = tuplestore_begin_heap ( true , true , SortMem ) ;
/* Set the destination to output to the tuplestore */
queryDesc - > dest = Tuplestore ;
/*
* Rewind the executor : we need to store the entire result set in
* the tuplestore , so that subsequent backward FETCHs can be
* processed .
*/
ExecutorRewind ( queryDesc ) ;
/* Fetch the result set into the tuplestore */
ExecutorRun ( queryDesc , ForwardScanDirection , 0 ) ;
/*
* Reset the position in the result set : ideally , this could be
* implemented by just skipping straight to the tuple # that we need
* to be at , but the tuplestore API doesn ' t support that . So we
* start at the beginning of the tuplestore and iterate through it
* until we reach where we need to be .
*/
if ( ! portal - > atEnd )
{
int store_pos = 0 ;
bool should_free ;
tuplestore_rescan ( portal - > holdStore ) ;
while ( store_pos < portal - > portalPos )
{
HeapTuple tmp = tuplestore_gettuple ( portal - > holdStore ,
true , & should_free ) ;
if ( tmp = = NULL )
elog ( ERROR ,
" PersistHoldablePortal: unexpected end of tuple stream " ) ;
store_pos + + ;
/*
* This could probably be optimized by creating and then
* deleting a separate memory context for this series of
* operations .
*/
if ( should_free )
pfree ( tmp ) ;
}
}
/*
* tell the executor to shutdown the query
* The current Portal structure contains some data that will be
* needed by the holdable cursor , but it has been allocated in a
* memory context that is not sufficiently long - lived : we need to
* copy it into the portal ' s long - term memory context .
*/
ExecutorEnd ( PortalGetQueryDesc ( portal ) ) ;
{
TupleDesc tupDescCopy ;
QueryDesc * queryDescCopy ;
/*
* We need to use this order as ExecutorEnd invalidates the
* queryDesc ' s tuple descriptor
*/
tupDescCopy = CreateTupleDescCopy ( queryDesc - > tupDesc ) ;
ExecutorEnd ( queryDesc ) ;
queryDescCopy = palloc ( sizeof ( * queryDescCopy ) ) ;
/*
* This doesn ' t copy all the dependant data in the QueryDesc ,
* but that ' s okay - - the only complex field we need to keep is
* the query ' s tupledesc , which we ' ve copied ourselves .
*/
memcpy ( queryDescCopy , queryDesc , sizeof ( * queryDesc ) ) ;
FreeQueryDesc ( queryDesc ) ;
queryDescCopy - > tupDesc = tupDescCopy ;
portal - > queryDesc = queryDescCopy ;
}
/*
* This should be unnecessary since the querydesc should be in the
* portal ' s memory context , but do it anyway for symmetry .
* We no longer need the portal ' s short - term memory context .
*/
FreeQueryDesc ( PortalGetQueryDesc ( portal ) ) ;
MemoryContextDelete ( PortalGetHeapMemory ( portal ) ) ;
PortalGetHeapMemory ( portal ) = NULL ;
}