mirror of https://github.com/postgres/postgres
parent
3152996ffb
commit
0d0254d1fb
@ -0,0 +1,534 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* spi.c-- |
||||
* Server Programming Interface |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#include "executor/spi.h" |
||||
#include "../backend/parser/parse.h" |
||||
#include "fmgr.h" |
||||
|
||||
typedef struct { |
||||
QueryTreeList *qtlist; /* malloced */ |
||||
uint32 processed; /* by Executor */ |
||||
SPITupleTable *tuptable; |
||||
Portal portal; /* portal per procedure */ |
||||
MemoryContext savedcntx; |
||||
CommandId savedId; |
||||
} _SPI_connection; |
||||
|
||||
static Portal _SPI_portal = (Portal) NULL; |
||||
static _SPI_connection *_SPI_stack = NULL; |
||||
static _SPI_connection *_SPI_current = NULL; |
||||
static int _SPI_connected = -1; |
||||
static int _SPI_curid = -1; |
||||
|
||||
uint32 SPI_processed = 0; |
||||
SPITupleTable *SPI_tuptable; |
||||
|
||||
void spi_printtup (HeapTuple tuple, TupleDesc tupdesc); |
||||
static int _SPI_pquery (QueryDesc *queryDesc); |
||||
#if 0 |
||||
static void _SPI_fetch (FetchStmt *stmt); |
||||
#endif |
||||
static int _SPI_begin_call (bool execmem); |
||||
static int _SPI_end_call (bool exfree, bool procmem); |
||||
static MemoryContext _SPI_execmem (void); |
||||
static MemoryContext _SPI_procmem (void); |
||||
static bool _SPI_checktuples (bool isRetrieveIntoRelation); |
||||
|
||||
|
||||
int |
||||
SPI_connect () |
||||
{ |
||||
char pname[64]; |
||||
PortalVariableMemory pvmem; |
||||
|
||||
/*
|
||||
* It's possible on startup and after commit/abort. |
||||
* In future we'll catch commit/abort in some way... |
||||
*/ |
||||
strcpy (pname, "<SPI manager>"); |
||||
_SPI_portal = GetPortalByName (pname); |
||||
if ( !PortalIsValid (_SPI_portal) ) |
||||
{ |
||||
if ( _SPI_stack != NULL ) /* there was abort */ |
||||
free (_SPI_stack); |
||||
_SPI_current = _SPI_stack = NULL; |
||||
_SPI_connected = _SPI_curid = -1; |
||||
SPI_processed = 0; |
||||
SPI_tuptable = NULL; |
||||
_SPI_portal = CreatePortal (pname); |
||||
if ( !PortalIsValid (_SPI_portal) ) |
||||
elog (FATAL, "SPI_connect: global initialization failed"); |
||||
} |
||||
|
||||
/*
|
||||
* When procedure called by Executor _SPI_curid expected to be |
||||
* equal to _SPI_connected |
||||
*/ |
||||
if ( _SPI_curid != _SPI_connected ) |
||||
return (SPI_ERROR_CONNECT); |
||||
|
||||
if ( _SPI_stack == NULL ) |
||||
{ |
||||
if ( _SPI_connected != -1 ) |
||||
elog (FATAL, "SPI_connect: no connection(s) expected"); |
||||
_SPI_stack = (_SPI_connection *) malloc (sizeof (_SPI_connection)); |
||||
} |
||||
else |
||||
{ |
||||
if ( _SPI_connected <= -1 ) |
||||
elog (FATAL, "SPI_connect: some connection(s) expected"); |
||||
_SPI_stack = (_SPI_connection *) realloc (_SPI_stack,
|
||||
(_SPI_connected + 1) * sizeof (_SPI_connection)); |
||||
} |
||||
/*
|
||||
* We' returning to procedure where _SPI_curid == _SPI_connected - 1 |
||||
*/ |
||||
_SPI_connected++; |
||||
|
||||
_SPI_current = &(_SPI_stack[_SPI_connected]); |
||||
_SPI_current->qtlist = NULL; |
||||
_SPI_current->processed = 0; |
||||
_SPI_current->tuptable = NULL; |
||||
|
||||
/* Create Portal for this procedure ... */ |
||||
sprintf (pname, "<SPI %d>", _SPI_connected); |
||||
_SPI_current->portal = CreatePortal (pname); |
||||
if ( !PortalIsValid (_SPI_current->portal) ) |
||||
elog (FATAL, "SPI_connect: initialization failed"); |
||||
|
||||
/* ... and switch to Portal' Variable memory - procedure' context */ |
||||
pvmem = PortalGetVariableMemory (_SPI_current->portal); |
||||
_SPI_current->savedcntx = MemoryContextSwitchTo ((MemoryContext)pvmem); |
||||
|
||||
_SPI_current->savedId = GetScanCommandId (); |
||||
SetScanCommandId (GetCurrentCommandId ()); |
||||
|
||||
return (SPI_OK_CONNECT); |
||||
|
||||
} |
||||
|
||||
int |
||||
SPI_finish () |
||||
{ |
||||
int res; |
||||
|
||||
res = _SPI_begin_call (false); /* live in procedure memory */ |
||||
if ( res < 0 ) |
||||
return (res); |
||||
|
||||
/* Restore memory context as it was before procedure call */ |
||||
MemoryContextSwitchTo (_SPI_current->savedcntx); |
||||
PortalDestroy (&(_SPI_current->portal)); |
||||
|
||||
SetScanCommandId (_SPI_current->savedId); |
||||
|
||||
/*
|
||||
* After _SPI_begin_call _SPI_connected == _SPI_curid. |
||||
* Now we are closing connection to SPI and returning to upper
|
||||
* Executor and so _SPI_connected must be equal to _SPI_curid. |
||||
*/ |
||||
_SPI_connected--; |
||||
_SPI_curid--; |
||||
if ( _SPI_connected == -1 ) |
||||
{ |
||||
free (_SPI_stack); |
||||
_SPI_stack = NULL; |
||||
} |
||||
else |
||||
{ |
||||
_SPI_stack = (_SPI_connection *) realloc (_SPI_stack,
|
||||
(_SPI_connected + 1) * sizeof (_SPI_connection)); |
||||
_SPI_current = &(_SPI_stack[_SPI_connected]); |
||||
} |
||||
|
||||
return (SPI_OK_FINISH); |
||||
|
||||
} |
||||
|
||||
int |
||||
SPI_exec (char *src) |
||||
{ |
||||
QueryTreeList *queryTree_list; |
||||
List *planTree_list; |
||||
QueryDesc *qdesc; |
||||
Query *queryTree; |
||||
Plan *planTree; |
||||
int res; |
||||
int i; |
||||
|
||||
res = _SPI_begin_call (true); |
||||
if ( res < 0 ) |
||||
return (res); |
||||
|
||||
/* Increment CommandCounter to see changes made by now */ |
||||
CommandCounterIncrement (); |
||||
StartPortalAllocMode (DefaultAllocMode, 0); |
||||
|
||||
SPI_processed = 0; |
||||
SPI_tuptable = NULL; |
||||
_SPI_current->tuptable = NULL; |
||||
|
||||
planTree_list = (List *) |
||||
pg_plan (src, NULL, 0, &queryTree_list, None); |
||||
|
||||
_SPI_current->qtlist = queryTree_list; |
||||
|
||||
for (i=0; i < queryTree_list->len - 1; i++) |
||||
{ |
||||
queryTree = (Query*) (queryTree_list->qtrees[i]); |
||||
planTree = lfirst(planTree_list); |
||||
|
||||
planTree_list = lnext (planTree_list); |
||||
|
||||
if ( queryTree->commandType == CMD_UTILITY ) |
||||
{ |
||||
if ( nodeTag (queryTree->utilityStmt ) == T_CopyStmt ) |
||||
{ |
||||
CopyStmt *stmt = (CopyStmt *)(queryTree->utilityStmt); |
||||
|
||||
if ( stmt->filename == NULL ) |
||||
{ |
||||
_SPI_end_call (true, true); |
||||
return (SPI_ERROR_COPY); |
||||
} |
||||
} |
||||
else if ( nodeTag (queryTree->utilityStmt ) == T_ClosePortalStmt ||
|
||||
nodeTag (queryTree->utilityStmt ) == T_FetchStmt ) |
||||
{ |
||||
_SPI_end_call (true, true); |
||||
return (SPI_ERROR_CURSOR); |
||||
} |
||||
else if ( nodeTag (queryTree->utilityStmt ) == T_TransactionStmt ) |
||||
{ |
||||
_SPI_end_call (true, true); |
||||
return (SPI_ERROR_TRANSACTION); |
||||
} |
||||
ProcessUtility (queryTree->utilityStmt, None); |
||||
} |
||||
else |
||||
ProcessQuery (queryTree, planTree, NULL, NULL, 0, None); |
||||
CommandCounterIncrement (); |
||||
} |
||||
|
||||
/*
|
||||
* Last query in list. Note that we don't call CommandCounterIncrement |
||||
* after last query - it will be done by up-level or by next call |
||||
* to SPI_exec. |
||||
*/ |
||||
queryTree = (Query*) (queryTree_list->qtrees[i]); |
||||
planTree = lfirst(planTree_list); |
||||
|
||||
if ( queryTree->commandType == CMD_UTILITY ) |
||||
{ |
||||
if ( nodeTag (queryTree->utilityStmt ) == T_CopyStmt ) |
||||
{ |
||||
CopyStmt *stmt = (CopyStmt *)(queryTree->utilityStmt); |
||||
|
||||
if ( stmt->filename == NULL ) |
||||
{ |
||||
_SPI_end_call (true, true); |
||||
return (SPI_ERROR_COPY); |
||||
} |
||||
} |
||||
#if 0 |
||||
else if ( nodeTag (queryTree->utilityStmt ) == T_FetchStmt ) |
||||
{ |
||||
_SPI_fetch ((FetchStmt *) (queryTree->utilityStmt)); |
||||
_SPI_end_call (true, true); |
||||
return (SPI_OK_FETCH); |
||||
} |
||||
#endif |
||||
else if ( nodeTag (queryTree->utilityStmt ) == T_ClosePortalStmt ||
|
||||
nodeTag (queryTree->utilityStmt ) == T_FetchStmt ) |
||||
{ |
||||
_SPI_end_call (true, true); |
||||
return (SPI_ERROR_CURSOR); |
||||
} |
||||
else if ( nodeTag (queryTree->utilityStmt ) == T_TransactionStmt ) |
||||
{ |
||||
_SPI_end_call (true, true); |
||||
return (SPI_ERROR_TRANSACTION); |
||||
} |
||||
ProcessUtility (queryTree->utilityStmt, None); |
||||
|
||||
_SPI_end_call (true, true); |
||||
return (SPI_OK_UTILITY); |
||||
} |
||||
|
||||
qdesc = CreateQueryDesc (queryTree, planTree, SPI); |
||||
|
||||
res = _SPI_pquery (qdesc); |
||||
|
||||
_SPI_end_call (true, true); |
||||
return (res); |
||||
|
||||
} |
||||
|
||||
static int |
||||
_SPI_pquery (QueryDesc *queryDesc) |
||||
{ |
||||
Query *parseTree; |
||||
Plan *plan; |
||||
int operation; |
||||
EState *state; |
||||
TupleDesc tupdesc; |
||||
bool isRetrieveIntoPortal = false; |
||||
bool isRetrieveIntoRelation = false; |
||||
char* intoName = NULL; |
||||
int res; |
||||
|
||||
parseTree = queryDesc->parsetree; |
||||
plan = queryDesc->plantree; |
||||
operation = queryDesc->operation; |
||||
|
||||
switch (operation) |
||||
{ |
||||
case CMD_SELECT: |
||||
res = SPI_OK_SELECT; |
||||
if (parseTree->isPortal) |
||||
{ |
||||
isRetrieveIntoPortal = true; |
||||
intoName = parseTree->into; |
||||
parseTree->isBinary = false; /* */ |
||||
|
||||
return (SPI_ERROR_CURSOR); |
||||
|
||||
} |
||||
else if (parseTree->into != NULL) /* select into table */ |
||||
{ |
||||
res = SPI_OK_SELINTO; |
||||
isRetrieveIntoRelation = true; |
||||
} |
||||
break; |
||||
case CMD_INSERT: |
||||
res = SPI_OK_INSERT; |
||||
break; |
||||
case CMD_DELETE: |
||||
res = SPI_OK_DELETE; |
||||
break; |
||||
case CMD_UPDATE: |
||||
res = SPI_OK_UPDATE; |
||||
break; |
||||
default: |
||||
return (SPI_ERROR_OPUNKNOWN); |
||||
} |
||||
|
||||
state = CreateExecutorState(); |
||||
|
||||
tupdesc = ExecutorStart(queryDesc, state); |
||||
|
||||
/* Don't work currently */ |
||||
if (isRetrieveIntoPortal) |
||||
{ |
||||
ProcessPortal(intoName, |
||||
parseTree, |
||||
plan, |
||||
state, |
||||
tupdesc, |
||||
None); |
||||
return (SPI_OK_CURSOR); |
||||
} |
||||
|
||||
ExecutorRun (queryDesc, state, EXEC_RUN, 0); |
||||
|
||||
_SPI_current->processed = state->es_processed; |
||||
if ( operation == CMD_SELECT ) |
||||
{ |
||||
if ( _SPI_checktuples (isRetrieveIntoRelation) ) |
||||
elog (FATAL, "SPI_select: # of processed tuples check failed"); |
||||
} |
||||
|
||||
ExecutorEnd (queryDesc, state); |
||||
|
||||
SPI_processed = _SPI_current->processed; |
||||
SPI_tuptable = _SPI_current->tuptable; |
||||
|
||||
return (res); |
||||
|
||||
} |
||||
|
||||
#if 0 |
||||
static void |
||||
_SPI_fetch (FetchStmt *stmt) |
||||
{ |
||||
char *name = stmt->portalname; |
||||
int feature = ( stmt->direction == FORWARD ) ? EXEC_FOR : EXEC_BACK; |
||||
int count = stmt->howMany; |
||||
Portal portal; |
||||
QueryDesc *queryDesc; |
||||
EState *state; |
||||
MemoryContext context; |
||||
|
||||
if ( name == NULL) |
||||
elog (FATAL, "SPI_fetch from blank portal unsupported"); |
||||
|
||||
portal = GetPortalByName (name); |
||||
if ( !PortalIsValid (portal) ) |
||||
elog (FATAL, "SPI_fetch: portal \"%s\" not found", name); |
||||
|
||||
context = MemoryContextSwitchTo((MemoryContext)PortalGetHeapMemory(portal)); |
||||
|
||||
queryDesc = PortalGetQueryDesc(portal); |
||||
state = PortalGetState(portal); |
||||
|
||||
ExecutorRun(queryDesc, state, feature, count); |
||||
|
||||
MemoryContextSwitchTo (context); /* switch to the normal Executor context */ |
||||
|
||||
_SPI_current->processed = state->es_processed; |
||||
if ( _SPI_checktuples (false) ) |
||||
elog (FATAL, "SPI_fetch: # of processed tuples check failed"); |
||||
|
||||
SPI_processed = _SPI_current->processed; |
||||
SPI_tuptable = _SPI_current->tuptable; |
||||
|
||||
} |
||||
#endif |
||||
|
||||
/*
|
||||
* spi_printtup -- |
||||
* store tuple retrieved by Executor into SPITupleTable |
||||
* of current SPI procedure |
||||
* |
||||
*/ |
||||
void |
||||
spi_printtup (HeapTuple tuple, TupleDesc tupdesc) |
||||
{ |
||||
SPITupleTable *tuptable; |
||||
MemoryContext oldcntx; |
||||
|
||||
/*
|
||||
* When called by Executor _SPI_curid expected to be |
||||
* equal to _SPI_connected |
||||
*/ |
||||
if ( _SPI_curid != _SPI_connected || _SPI_connected < 0 ) |
||||
elog (FATAL, "SPI: improper call to spi_printtup"); |
||||
if ( _SPI_current != &(_SPI_stack[_SPI_curid]) ) |
||||
elog (FATAL, "SPI: stack corrupted in spi_printtup"); |
||||
|
||||
oldcntx = _SPI_procmem (); /* switch to procedure memory context */ |
||||
|
||||
tuptable = _SPI_current->tuptable; |
||||
if ( tuptable == NULL ) |
||||
{ |
||||
_SPI_current->tuptable = tuptable = (SPITupleTable *) |
||||
palloc (sizeof (SPITupleTable)); |
||||
tuptable->alloced = tuptable->free = 128; |
||||
tuptable->vals = (HeapTuple *) palloc (tuptable->alloced * sizeof (HeapTuple)); |
||||
tuptable->tupdesc = CreateTupleDescCopy (tupdesc); |
||||
} |
||||
else if ( tuptable->free == 0 ) |
||||
{ |
||||
tuptable->free = 256; |
||||
tuptable->alloced += tuptable->free; |
||||
tuptable->vals = (HeapTuple *) repalloc (tuptable->vals, |
||||
tuptable->alloced * sizeof (HeapTuple)); |
||||
} |
||||
|
||||
tuptable->vals[tuptable->alloced - tuptable->free] = heap_copytuple (tuple); |
||||
(tuptable->free)--; |
||||
|
||||
MemoryContextSwitchTo (oldcntx); |
||||
return; |
||||
} |
||||
|
||||
static MemoryContext |
||||
_SPI_execmem () |
||||
{ |
||||
MemoryContext oldcntx; |
||||
PortalHeapMemory phmem; |
||||
|
||||
phmem = PortalGetHeapMemory (_SPI_current->portal); |
||||
oldcntx = MemoryContextSwitchTo ((MemoryContext)phmem); |
||||
|
||||
return (oldcntx); |
||||
|
||||
} |
||||
|
||||
static MemoryContext |
||||
_SPI_procmem () |
||||
{ |
||||
MemoryContext oldcntx; |
||||
PortalVariableMemory pvmem; |
||||
|
||||
pvmem = PortalGetVariableMemory (_SPI_current->portal); |
||||
oldcntx = MemoryContextSwitchTo ((MemoryContext)pvmem); |
||||
|
||||
return (oldcntx); |
||||
|
||||
} |
||||
|
||||
/*
|
||||
* _SPI_begin_call -- |
||||
* |
||||
*/ |
||||
static int |
||||
_SPI_begin_call (bool execmem) |
||||
{ |
||||
if ( _SPI_curid + 1 != _SPI_connected ) |
||||
return (SPI_ERROR_UNCONNECTED); |
||||
_SPI_curid++; |
||||
if ( _SPI_current != &(_SPI_stack[_SPI_curid]) ) |
||||
elog (FATAL, "SPI: stack corrupted"); |
||||
|
||||
if ( execmem ) /* switch to the Executor memory context */ |
||||
_SPI_execmem (); |
||||
|
||||
return (0); |
||||
} |
||||
|
||||
static int |
||||
_SPI_end_call (bool exfree, bool procmem) |
||||
{ |
||||
/*
|
||||
* We' returning to procedure where _SPI_curid == _SPI_connected - 1 |
||||
*/ |
||||
_SPI_curid--; |
||||
|
||||
if ( exfree ) /* free SPI_exec allocations */ |
||||
{ |
||||
free (_SPI_current->qtlist->qtrees); |
||||
free (_SPI_current->qtlist); |
||||
_SPI_current->qtlist = NULL; |
||||
} |
||||
|
||||
if ( procmem ) /* switch to the procedure memory context */ |
||||
{ /* but free Executor memory before */ |
||||
EndPortalAllocMode (); |
||||
_SPI_procmem (); |
||||
} |
||||
|
||||
return (0); |
||||
} |
||||
|
||||
static bool |
||||
_SPI_checktuples (bool isRetrieveIntoRelation) |
||||
{ |
||||
uint32 processed = _SPI_current->processed; |
||||
SPITupleTable *tuptable = _SPI_current->tuptable; |
||||
bool failed = false; |
||||
|
||||
if ( processed == 0 ) |
||||
{ |
||||
if ( tuptable != NULL ) |
||||
failed = true; |
||||
} |
||||
else /* some tuples were processed */ |
||||
{ |
||||
if ( tuptable == NULL ) /* spi_printtup was not called */ |
||||
{ |
||||
if ( !isRetrieveIntoRelation ) |
||||
failed = true; |
||||
} |
||||
else if ( isRetrieveIntoRelation ) |
||||
failed = true; |
||||
else if ( processed != ( tuptable->alloced - tuptable->free ) ) |
||||
failed = true; |
||||
} |
||||
|
||||
return (failed); |
||||
} |
||||
@ -0,0 +1,69 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* spi.h-- |
||||
*
|
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#ifndef SPI_H |
||||
#define SPI_H |
||||
|
||||
#include <string.h> |
||||
#include "postgres.h" |
||||
#include "nodes/primnodes.h" |
||||
#include "nodes/relation.h" |
||||
#include "nodes/execnodes.h" |
||||
#include "nodes/plannodes.h" |
||||
#include "catalog/pg_proc.h" |
||||
#include "parser/parse_query.h" |
||||
#include "tcop/pquery.h" |
||||
#include "tcop/tcopprot.h" |
||||
#include "tcop/utility.h" |
||||
#include "tcop/dest.h" |
||||
#include "nodes/params.h" |
||||
#include "utils/fcache.h" |
||||
#include "utils/datum.h" |
||||
#include "utils/elog.h" |
||||
#include "utils/palloc.h" |
||||
#include "utils/syscache.h" |
||||
#include "utils/mcxt.h" |
||||
#include "utils/portal.h" |
||||
#include "catalog/pg_language.h" |
||||
#include "access/heapam.h" |
||||
#include "access/xact.h" |
||||
#include "executor/executor.h" |
||||
#include "executor/execdefs.h" |
||||
|
||||
typedef struct { |
||||
uint32 alloced; /* # of alloced vals */ |
||||
uint32 free; /* # of free vals */ |
||||
TupleDesc tupdesc; /* tuple descriptor */ |
||||
HeapTuple *vals; /* tuples */ |
||||
} SPITupleTable; |
||||
|
||||
#define SPI_ERROR_CONNECT -1 |
||||
#define SPI_ERROR_COPY -2 |
||||
#define SPI_ERROR_OPUNKNOWN -3 |
||||
#define SPI_ERROR_UNCONNECTED -4 |
||||
#define SPI_ERROR_CURSOR -5 |
||||
#define SPI_ERROR_TRANSACTION -6 |
||||
|
||||
#define SPI_OK_CONNECT 0 |
||||
#define SPI_OK_FINISH 1 |
||||
#define SPI_OK_FETCH 2 |
||||
#define SPI_OK_UTILITY 3 |
||||
#define SPI_OK_SELECT 4 |
||||
#define SPI_OK_SELINTO 5 |
||||
#define SPI_OK_INSERT 6 |
||||
#define SPI_OK_DELETE 7 |
||||
#define SPI_OK_UPDATE 8 |
||||
#define SPI_OK_CURSOR 9 |
||||
|
||||
extern uint32 SPI_processed; |
||||
extern SPITupleTable *SPI_tuptable; |
||||
|
||||
extern int SPI_connect (void); |
||||
extern int SPI_finish (void); |
||||
extern int SPI_exec (char *src); |
||||
|
||||
#endif /* SPI_H */ |
||||
Loading…
Reference in new issue