|
|
|
@ -8,7 +8,7 @@ |
|
|
|
|
* |
|
|
|
|
* |
|
|
|
|
* IDENTIFICATION |
|
|
|
|
* $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.9 2003/03/10 03:53:49 tgl Exp $ |
|
|
|
|
* $Header: /cvsroot/pgsql/src/backend/commands/portalcmds.c,v 1.10 2003/03/11 19:40:22 tgl Exp $ |
|
|
|
|
* |
|
|
|
|
*------------------------------------------------------------------------- |
|
|
|
|
*/ |
|
|
|
@ -23,6 +23,11 @@ |
|
|
|
|
#include "rewrite/rewriteHandler.h" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static long DoRelativeFetch(Portal portal, |
|
|
|
|
bool forward, |
|
|
|
|
long count, |
|
|
|
|
CommandDest dest); |
|
|
|
|
static void DoPortalRewind(Portal portal); |
|
|
|
|
static Portal PreparePortal(char *portalName); |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -102,9 +107,7 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) |
|
|
|
|
* PerformPortalFetch |
|
|
|
|
* Execute SQL FETCH or MOVE command. |
|
|
|
|
* |
|
|
|
|
* name: name of portal |
|
|
|
|
* forward: forward or backward fetch? |
|
|
|
|
* count: # of tuples to fetch (INT_MAX means "all"; 0 means "refetch") |
|
|
|
|
* stmt: parsetree node for command |
|
|
|
|
* dest: where to send results |
|
|
|
|
* completionTag: points to a buffer of size COMPLETION_TAG_BUFSIZE |
|
|
|
|
* in which to store a command completion status string. |
|
|
|
@ -112,9 +115,7 @@ PerformCursorOpen(DeclareCursorStmt *stmt, CommandDest dest) |
|
|
|
|
* completionTag may be NULL if caller doesn't want a status string. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
PerformPortalFetch(char *name, |
|
|
|
|
bool forward, |
|
|
|
|
long count, |
|
|
|
|
PerformPortalFetch(FetchStmt *stmt, |
|
|
|
|
CommandDest dest, |
|
|
|
|
char *completionTag) |
|
|
|
|
{ |
|
|
|
@ -123,48 +124,150 @@ PerformPortalFetch(char *name, |
|
|
|
|
|
|
|
|
|
/* initialize completion status in case of early exit */ |
|
|
|
|
if (completionTag) |
|
|
|
|
strcpy(completionTag, (dest == None) ? "MOVE 0" : "FETCH 0"); |
|
|
|
|
|
|
|
|
|
/* sanity checks */ |
|
|
|
|
if (name == NULL) |
|
|
|
|
{ |
|
|
|
|
elog(WARNING, "PerformPortalFetch: missing portal name"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
strcpy(completionTag, stmt->ismove ? "MOVE 0" : "FETCH 0"); |
|
|
|
|
|
|
|
|
|
/* get the portal from the portal name */ |
|
|
|
|
portal = GetPortalByName(name); |
|
|
|
|
portal = GetPortalByName(stmt->portalname); |
|
|
|
|
if (!PortalIsValid(portal)) |
|
|
|
|
{ |
|
|
|
|
elog(WARNING, "PerformPortalFetch: portal \"%s\" not found", |
|
|
|
|
name); |
|
|
|
|
stmt->portalname); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Do it */ |
|
|
|
|
nprocessed = DoPortalFetch(portal, forward, count, dest); |
|
|
|
|
nprocessed = DoPortalFetch(portal, |
|
|
|
|
stmt->direction, |
|
|
|
|
stmt->howMany, |
|
|
|
|
stmt->ismove ? None : dest); |
|
|
|
|
|
|
|
|
|
/* Return command status if wanted */ |
|
|
|
|
if (completionTag) |
|
|
|
|
snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %ld", |
|
|
|
|
(dest == None) ? "MOVE" : "FETCH", |
|
|
|
|
stmt->ismove ? "MOVE" : "FETCH", |
|
|
|
|
nprocessed); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* DoPortalFetch |
|
|
|
|
* Guts of PerformPortalFetch --- shared with SPI cursor operations |
|
|
|
|
* Guts of PerformPortalFetch --- shared with SPI cursor operations. |
|
|
|
|
* Caller must already have validated the Portal. |
|
|
|
|
* |
|
|
|
|
* Returns number of rows processed. |
|
|
|
|
* Returns number of rows processed (suitable for use in result tag) |
|
|
|
|
*/ |
|
|
|
|
long |
|
|
|
|
DoPortalFetch(Portal portal, bool forward, long count, CommandDest dest) |
|
|
|
|
DoPortalFetch(Portal portal, |
|
|
|
|
FetchDirection fdirection, |
|
|
|
|
long count, |
|
|
|
|
CommandDest dest) |
|
|
|
|
{ |
|
|
|
|
QueryDesc *queryDesc; |
|
|
|
|
EState *estate; |
|
|
|
|
MemoryContext oldcontext; |
|
|
|
|
ScanDirection direction; |
|
|
|
|
bool temp_desc = false; |
|
|
|
|
bool forward; |
|
|
|
|
|
|
|
|
|
switch (fdirection) |
|
|
|
|
{ |
|
|
|
|
case FETCH_FORWARD: |
|
|
|
|
if (count < 0) |
|
|
|
|
{ |
|
|
|
|
fdirection = FETCH_BACKWARD; |
|
|
|
|
count = -count; |
|
|
|
|
} |
|
|
|
|
/* fall out of switch to share code with FETCH_BACKWARD */ |
|
|
|
|
break; |
|
|
|
|
case FETCH_BACKWARD: |
|
|
|
|
if (count < 0) |
|
|
|
|
{ |
|
|
|
|
fdirection = FETCH_FORWARD; |
|
|
|
|
count = -count; |
|
|
|
|
} |
|
|
|
|
/* fall out of switch to share code with FETCH_FORWARD */ |
|
|
|
|
break; |
|
|
|
|
case FETCH_ABSOLUTE: |
|
|
|
|
if (count > 0) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Definition: Rewind to start, advance count-1 rows, return |
|
|
|
|
* next row (if any). In practice, if the goal is less than |
|
|
|
|
* halfway back to the start, it's better to scan from where |
|
|
|
|
* we are. In any case, we arrange to fetch the target row |
|
|
|
|
* going forwards. |
|
|
|
|
*/ |
|
|
|
|
if (portal->posOverflow || portal->portalPos == LONG_MAX || |
|
|
|
|
count-1 <= portal->portalPos / 2) |
|
|
|
|
{ |
|
|
|
|
DoPortalRewind(portal); |
|
|
|
|
if (count > 1) |
|
|
|
|
DoRelativeFetch(portal, true, count-1, None); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
long pos = portal->portalPos; |
|
|
|
|
|
|
|
|
|
if (portal->atEnd) |
|
|
|
|
pos++; /* need one extra fetch if off end */ |
|
|
|
|
if (count <= pos) |
|
|
|
|
DoRelativeFetch(portal, false, pos-count+1, None); |
|
|
|
|
else if (count > pos+1) |
|
|
|
|
DoRelativeFetch(portal, true, count-pos-1, None); |
|
|
|
|
} |
|
|
|
|
return DoRelativeFetch(portal, true, 1L, dest); |
|
|
|
|
} |
|
|
|
|
else if (count < 0) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Definition: Advance to end, back up abs(count)-1 rows, |
|
|
|
|
* return prior row (if any). We could optimize this if we |
|
|
|
|
* knew in advance where the end was, but typically we won't. |
|
|
|
|
* (Is it worth considering case where count > half of size |
|
|
|
|
* of query? We could rewind once we know the size ...) |
|
|
|
|
*/ |
|
|
|
|
DoRelativeFetch(portal, true, FETCH_ALL, None); |
|
|
|
|
if (count < -1) |
|
|
|
|
DoRelativeFetch(portal, false, -count-1, None); |
|
|
|
|
return DoRelativeFetch(portal, false, 1L, dest); |
|
|
|
|
} |
|
|
|
|
else /* count == 0 */ |
|
|
|
|
{ |
|
|
|
|
/* Rewind to start, return zero rows */ |
|
|
|
|
DoPortalRewind(portal); |
|
|
|
|
return DoRelativeFetch(portal, true, 0L, dest); |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case FETCH_RELATIVE: |
|
|
|
|
if (count > 0) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Definition: advance count-1 rows, return next row (if any). |
|
|
|
|
*/ |
|
|
|
|
if (count > 1) |
|
|
|
|
DoRelativeFetch(portal, true, count-1, None); |
|
|
|
|
return DoRelativeFetch(portal, true, 1L, dest); |
|
|
|
|
} |
|
|
|
|
else if (count < 0) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Definition: back up abs(count)-1 rows, return prior row |
|
|
|
|
* (if any). |
|
|
|
|
*/ |
|
|
|
|
if (count < -1) |
|
|
|
|
DoRelativeFetch(portal, false, -count-1, None); |
|
|
|
|
return DoRelativeFetch(portal, false, 1L, dest); |
|
|
|
|
} |
|
|
|
|
else /* count == 0 */ |
|
|
|
|
{ |
|
|
|
|
/* Same as FETCH FORWARD 0, so fall out of switch */ |
|
|
|
|
fdirection = FETCH_FORWARD; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
elog(ERROR, "DoPortalFetch: bogus direction"); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Get here with fdirection == FETCH_FORWARD or FETCH_BACKWARD, |
|
|
|
|
* and count >= 0. |
|
|
|
|
*/ |
|
|
|
|
forward = (fdirection == FETCH_FORWARD); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Zero count means to re-fetch the current row, if any (per SQL92) |
|
|
|
@ -174,7 +277,7 @@ DoPortalFetch(Portal portal, bool forward, long count, CommandDest dest) |
|
|
|
|
bool on_row; |
|
|
|
|
|
|
|
|
|
/* Are we sitting on a row? */ |
|
|
|
|
on_row = (portal->atStart == false && portal->atEnd == false); |
|
|
|
|
on_row = (!portal->atStart && !portal->atEnd); |
|
|
|
|
|
|
|
|
|
if (dest == None) |
|
|
|
|
{ |
|
|
|
@ -187,14 +290,12 @@ DoPortalFetch(Portal portal, bool forward, long count, CommandDest dest) |
|
|
|
|
* If we are sitting on a row, back up one so we can re-fetch it. |
|
|
|
|
* If we are not sitting on a row, we still have to start up and |
|
|
|
|
* shut down the executor so that the destination is initialized |
|
|
|
|
* and shut down correctly; so keep going. Further down in the |
|
|
|
|
* routine, count == 0 means we will retrieve no row. |
|
|
|
|
* and shut down correctly; so keep going. To DoRelativeFetch, |
|
|
|
|
* count == 0 means we will retrieve no row. |
|
|
|
|
*/ |
|
|
|
|
if (on_row) |
|
|
|
|
{ |
|
|
|
|
DoPortalFetch(portal, |
|
|
|
|
false /* backward */, 1L, |
|
|
|
|
None /* throw away output */); |
|
|
|
|
DoRelativeFetch(portal, false, 1L, None); |
|
|
|
|
/* Set up to fetch one row forward */ |
|
|
|
|
count = 1; |
|
|
|
|
forward = true; |
|
|
|
@ -203,9 +304,44 @@ DoPortalFetch(Portal portal, bool forward, long count, CommandDest dest) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* switch into the portal context |
|
|
|
|
* Optimize MOVE BACKWARD ALL into a Rewind. |
|
|
|
|
*/ |
|
|
|
|
oldcontext = MemoryContextSwitchTo(PortalGetHeapMemory(portal)); |
|
|
|
|
if (!forward && count == FETCH_ALL && dest == None) |
|
|
|
|
{ |
|
|
|
|
long result = portal->portalPos; |
|
|
|
|
|
|
|
|
|
if (result > 0 && !portal->atEnd) |
|
|
|
|
result--; |
|
|
|
|
DoPortalRewind(portal); |
|
|
|
|
/* result is bogus if pos had overflowed, but it's best we can do */ |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return DoRelativeFetch(portal, forward, count, dest); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* DoRelativeFetch |
|
|
|
|
* Do fetch for a simple N-rows-forward-or-backward case. |
|
|
|
|
* |
|
|
|
|
* count <= 0 is interpreted as a no-op: the destination gets started up |
|
|
|
|
* and shut down, but nothing else happens. Also, count == FETCH_ALL is |
|
|
|
|
* interpreted as "all rows". |
|
|
|
|
* |
|
|
|
|
* Caller must already have validated the Portal. |
|
|
|
|
* |
|
|
|
|
* Returns number of rows processed (suitable for use in result tag) |
|
|
|
|
*/ |
|
|
|
|
static long |
|
|
|
|
DoRelativeFetch(Portal portal, |
|
|
|
|
bool forward, |
|
|
|
|
long count, |
|
|
|
|
CommandDest dest) |
|
|
|
|
{ |
|
|
|
|
QueryDesc *queryDesc; |
|
|
|
|
EState *estate; |
|
|
|
|
ScanDirection direction; |
|
|
|
|
QueryDesc temp_queryDesc; |
|
|
|
|
|
|
|
|
|
queryDesc = PortalGetQueryDesc(portal); |
|
|
|
|
estate = queryDesc->estate; |
|
|
|
@ -224,12 +360,9 @@ DoPortalFetch(Portal portal, bool forward, long count, CommandDest dest) |
|
|
|
|
if (dest != queryDesc->dest && |
|
|
|
|
!(queryDesc->dest == RemoteInternal && dest == Remote)) |
|
|
|
|
{ |
|
|
|
|
QueryDesc *qdesc = (QueryDesc *) palloc(sizeof(QueryDesc)); |
|
|
|
|
|
|
|
|
|
memcpy(qdesc, queryDesc, sizeof(QueryDesc)); |
|
|
|
|
qdesc->dest = dest; |
|
|
|
|
queryDesc = qdesc; |
|
|
|
|
temp_desc = true; |
|
|
|
|
memcpy(&temp_queryDesc, queryDesc, sizeof(QueryDesc)); |
|
|
|
|
temp_queryDesc.dest = dest; |
|
|
|
|
queryDesc = &temp_queryDesc; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -240,65 +373,101 @@ DoPortalFetch(Portal portal, bool forward, long count, CommandDest dest) |
|
|
|
|
* robust about being called again if they've already returned NULL |
|
|
|
|
* once.) Then call the executor (we must not skip this, because the |
|
|
|
|
* destination needs to see a setup and shutdown even if no tuples are |
|
|
|
|
* available). Finally, update the atStart/atEnd state depending on |
|
|
|
|
* available). Finally, update the portal position state depending on |
|
|
|
|
* the number of tuples that were retrieved. |
|
|
|
|
*/ |
|
|
|
|
if (forward) |
|
|
|
|
{ |
|
|
|
|
if (portal->atEnd || count == 0) |
|
|
|
|
if (portal->atEnd || count <= 0) |
|
|
|
|
direction = NoMovementScanDirection; |
|
|
|
|
else |
|
|
|
|
direction = ForwardScanDirection; |
|
|
|
|
|
|
|
|
|
/* In the executor, zero count processes all portal rows */ |
|
|
|
|
if (count == INT_MAX) |
|
|
|
|
/* In the executor, zero count processes all rows */ |
|
|
|
|
if (count == FETCH_ALL) |
|
|
|
|
count = 0; |
|
|
|
|
|
|
|
|
|
ExecutorRun(queryDesc, direction, count); |
|
|
|
|
|
|
|
|
|
if (direction != NoMovementScanDirection) |
|
|
|
|
{ |
|
|
|
|
long oldPos; |
|
|
|
|
|
|
|
|
|
if (estate->es_processed > 0) |
|
|
|
|
portal->atStart = false; /* OK to back up now */ |
|
|
|
|
if (count <= 0 || (long) estate->es_processed < count) |
|
|
|
|
portal->atStart = false; /* OK to go backward now */ |
|
|
|
|
if (count == 0 || |
|
|
|
|
(unsigned long) estate->es_processed < (unsigned long) count) |
|
|
|
|
portal->atEnd = true; /* we retrieved 'em all */ |
|
|
|
|
oldPos = portal->portalPos; |
|
|
|
|
portal->portalPos += estate->es_processed; |
|
|
|
|
/* portalPos doesn't advance when we fall off the end */ |
|
|
|
|
if (portal->portalPos < oldPos) |
|
|
|
|
portal->posOverflow = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
if (!portal->backwardOK) |
|
|
|
|
elog(ERROR, "Cursor cannot scan backwards" |
|
|
|
|
elog(ERROR, "Cursor can only scan forward" |
|
|
|
|
"\n\tDeclare it with SCROLL option to enable backward scan"); |
|
|
|
|
|
|
|
|
|
if (portal->atStart || count == 0) |
|
|
|
|
if (portal->atStart || count <= 0) |
|
|
|
|
direction = NoMovementScanDirection; |
|
|
|
|
else |
|
|
|
|
direction = BackwardScanDirection; |
|
|
|
|
|
|
|
|
|
/* In the executor, zero count processes all portal rows */ |
|
|
|
|
if (count == INT_MAX) |
|
|
|
|
/* In the executor, zero count processes all rows */ |
|
|
|
|
if (count == FETCH_ALL) |
|
|
|
|
count = 0; |
|
|
|
|
|
|
|
|
|
ExecutorRun(queryDesc, direction, count); |
|
|
|
|
|
|
|
|
|
if (direction != NoMovementScanDirection) |
|
|
|
|
{ |
|
|
|
|
if (estate->es_processed > 0) |
|
|
|
|
if (estate->es_processed > 0 && portal->atEnd) |
|
|
|
|
{ |
|
|
|
|
portal->atEnd = false; /* OK to go forward now */ |
|
|
|
|
if (count <= 0 || (long) estate->es_processed < count) |
|
|
|
|
portal->portalPos++; /* adjust for endpoint case */ |
|
|
|
|
} |
|
|
|
|
if (count == 0 || |
|
|
|
|
(unsigned long) estate->es_processed < (unsigned long) count) |
|
|
|
|
{ |
|
|
|
|
portal->atStart = true; /* we retrieved 'em all */ |
|
|
|
|
portal->portalPos = 0; |
|
|
|
|
portal->posOverflow = false; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
long oldPos; |
|
|
|
|
|
|
|
|
|
oldPos = portal->portalPos; |
|
|
|
|
portal->portalPos -= estate->es_processed; |
|
|
|
|
if (portal->portalPos > oldPos || |
|
|
|
|
portal->portalPos <= 0) |
|
|
|
|
portal->posOverflow = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return estate->es_processed; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Clean up and switch back to old context. |
|
|
|
|
* DoPortalRewind - rewind a Portal to starting point |
|
|
|
|
*/ |
|
|
|
|
if (temp_desc) |
|
|
|
|
pfree(queryDesc); |
|
|
|
|
static void |
|
|
|
|
DoPortalRewind(Portal portal) |
|
|
|
|
{ |
|
|
|
|
QueryDesc *queryDesc; |
|
|
|
|
|
|
|
|
|
queryDesc = PortalGetQueryDesc(portal); |
|
|
|
|
|
|
|
|
|
MemoryContextSwitchTo(oldcontext); |
|
|
|
|
ExecutorRewind(queryDesc); |
|
|
|
|
|
|
|
|
|
return estate->es_processed; |
|
|
|
|
portal->atStart = true; |
|
|
|
|
portal->atEnd = false; |
|
|
|
|
portal->portalPos = 0; |
|
|
|
|
portal->posOverflow = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -310,15 +479,6 @@ PerformPortalClose(char *name) |
|
|
|
|
{ |
|
|
|
|
Portal portal; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* sanity checks ... why is this case allowed by the grammar, anyway? |
|
|
|
|
*/ |
|
|
|
|
if (name == NULL) |
|
|
|
|
{ |
|
|
|
|
elog(WARNING, "PerformPortalClose: missing portal name"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* get the portal from the portal name |
|
|
|
|
*/ |
|
|
|
|