|
|
|
|
@ -3605,31 +3605,31 @@ get_remote_estimate(const char *sql, PGconn *conn, |
|
|
|
|
Cost *startup_cost, Cost *total_cost) |
|
|
|
|
{ |
|
|
|
|
PGresult *res; |
|
|
|
|
char *line; |
|
|
|
|
char *p; |
|
|
|
|
int n; |
|
|
|
|
char *line; |
|
|
|
|
char *p; |
|
|
|
|
int n; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Execute EXPLAIN remotely. |
|
|
|
|
*/ |
|
|
|
|
res = pgfdw_exec_query(conn, sql, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql); |
|
|
|
|
/*
|
|
|
|
|
* Execute EXPLAIN remotely. |
|
|
|
|
*/ |
|
|
|
|
res = pgfdw_exec_query(conn, sql, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Extract cost numbers for topmost plan node. Note we search for a |
|
|
|
|
* left paren from the end of the line to avoid being confused by |
|
|
|
|
* other uses of parentheses. |
|
|
|
|
*/ |
|
|
|
|
line = PQgetvalue(res, 0, 0); |
|
|
|
|
p = strrchr(line, '('); |
|
|
|
|
if (p == NULL) |
|
|
|
|
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); |
|
|
|
|
n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", |
|
|
|
|
startup_cost, total_cost, rows, width); |
|
|
|
|
if (n != 4) |
|
|
|
|
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); |
|
|
|
|
PQclear(res); |
|
|
|
|
/*
|
|
|
|
|
* Extract cost numbers for topmost plan node. Note we search for a left |
|
|
|
|
* paren from the end of the line to avoid being confused by other uses of |
|
|
|
|
* parentheses. |
|
|
|
|
*/ |
|
|
|
|
line = PQgetvalue(res, 0, 0); |
|
|
|
|
p = strrchr(line, '('); |
|
|
|
|
if (p == NULL) |
|
|
|
|
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); |
|
|
|
|
n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", |
|
|
|
|
startup_cost, total_cost, rows, width); |
|
|
|
|
if (n != 4) |
|
|
|
|
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); |
|
|
|
|
PQclear(res); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -3812,63 +3812,63 @@ fetch_more_data(ForeignScanState *node) |
|
|
|
|
MemoryContextReset(fsstate->batch_cxt); |
|
|
|
|
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); |
|
|
|
|
|
|
|
|
|
if (fsstate->async_capable) |
|
|
|
|
{ |
|
|
|
|
Assert(fsstate->conn_state->pendingAreq); |
|
|
|
|
if (fsstate->async_capable) |
|
|
|
|
{ |
|
|
|
|
Assert(fsstate->conn_state->pendingAreq); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The query was already sent by an earlier call to |
|
|
|
|
* fetch_more_data_begin. So now we just fetch the result. |
|
|
|
|
*/ |
|
|
|
|
res = pgfdw_get_result(conn); |
|
|
|
|
/* On error, report the original query, not the FETCH. */ |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, fsstate->query); |
|
|
|
|
/*
|
|
|
|
|
* The query was already sent by an earlier call to |
|
|
|
|
* fetch_more_data_begin. So now we just fetch the result. |
|
|
|
|
*/ |
|
|
|
|
res = pgfdw_get_result(conn); |
|
|
|
|
/* On error, report the original query, not the FETCH. */ |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, fsstate->query); |
|
|
|
|
|
|
|
|
|
/* Reset per-connection state */ |
|
|
|
|
fsstate->conn_state->pendingAreq = NULL; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
char sql[64]; |
|
|
|
|
/* Reset per-connection state */ |
|
|
|
|
fsstate->conn_state->pendingAreq = NULL; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
char sql[64]; |
|
|
|
|
|
|
|
|
|
/* This is a regular synchronous fetch. */ |
|
|
|
|
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", |
|
|
|
|
fsstate->fetch_size, fsstate->cursor_number); |
|
|
|
|
/* This is a regular synchronous fetch. */ |
|
|
|
|
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", |
|
|
|
|
fsstate->fetch_size, fsstate->cursor_number); |
|
|
|
|
|
|
|
|
|
res = pgfdw_exec_query(conn, sql, fsstate->conn_state); |
|
|
|
|
/* On error, report the original query, not the FETCH. */ |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, fsstate->query); |
|
|
|
|
} |
|
|
|
|
res = pgfdw_exec_query(conn, sql, fsstate->conn_state); |
|
|
|
|
/* On error, report the original query, not the FETCH. */ |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, fsstate->query); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Convert the data into HeapTuples */ |
|
|
|
|
numrows = PQntuples(res); |
|
|
|
|
fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); |
|
|
|
|
fsstate->num_tuples = numrows; |
|
|
|
|
fsstate->next_tuple = 0; |
|
|
|
|
/* Convert the data into HeapTuples */ |
|
|
|
|
numrows = PQntuples(res); |
|
|
|
|
fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); |
|
|
|
|
fsstate->num_tuples = numrows; |
|
|
|
|
fsstate->next_tuple = 0; |
|
|
|
|
|
|
|
|
|
for (i = 0; i < numrows; i++) |
|
|
|
|
{ |
|
|
|
|
Assert(IsA(node->ss.ps.plan, ForeignScan)); |
|
|
|
|
|
|
|
|
|
fsstate->tuples[i] = |
|
|
|
|
make_tuple_from_result_row(res, i, |
|
|
|
|
fsstate->rel, |
|
|
|
|
fsstate->attinmeta, |
|
|
|
|
fsstate->retrieved_attrs, |
|
|
|
|
node, |
|
|
|
|
fsstate->temp_cxt); |
|
|
|
|
} |
|
|
|
|
for (i = 0; i < numrows; i++) |
|
|
|
|
{ |
|
|
|
|
Assert(IsA(node->ss.ps.plan, ForeignScan)); |
|
|
|
|
|
|
|
|
|
fsstate->tuples[i] = |
|
|
|
|
make_tuple_from_result_row(res, i, |
|
|
|
|
fsstate->rel, |
|
|
|
|
fsstate->attinmeta, |
|
|
|
|
fsstate->retrieved_attrs, |
|
|
|
|
node, |
|
|
|
|
fsstate->temp_cxt); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Update fetch_ct_2 */ |
|
|
|
|
if (fsstate->fetch_ct_2 < 2) |
|
|
|
|
fsstate->fetch_ct_2++; |
|
|
|
|
/* Update fetch_ct_2 */ |
|
|
|
|
if (fsstate->fetch_ct_2 < 2) |
|
|
|
|
fsstate->fetch_ct_2++; |
|
|
|
|
|
|
|
|
|
/* Must be EOF if we didn't get as many tuples as we asked for. */ |
|
|
|
|
fsstate->eof_reached = (numrows < fsstate->fetch_size); |
|
|
|
|
/* Must be EOF if we didn't get as many tuples as we asked for. */ |
|
|
|
|
fsstate->eof_reached = (numrows < fsstate->fetch_size); |
|
|
|
|
|
|
|
|
|
PQclear(res); |
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
MemoryContextSwitchTo(oldcontext); |
|
|
|
|
} |
|
|
|
|
@ -4322,20 +4322,20 @@ static void |
|
|
|
|
store_returning_result(PgFdwModifyState *fmstate, |
|
|
|
|
TupleTableSlot *slot, PGresult *res) |
|
|
|
|
{ |
|
|
|
|
HeapTuple newtup; |
|
|
|
|
HeapTuple newtup; |
|
|
|
|
|
|
|
|
|
newtup = make_tuple_from_result_row(res, 0, |
|
|
|
|
fmstate->rel, |
|
|
|
|
fmstate->attinmeta, |
|
|
|
|
fmstate->retrieved_attrs, |
|
|
|
|
NULL, |
|
|
|
|
fmstate->temp_cxt); |
|
|
|
|
newtup = make_tuple_from_result_row(res, 0, |
|
|
|
|
fmstate->rel, |
|
|
|
|
fmstate->attinmeta, |
|
|
|
|
fmstate->retrieved_attrs, |
|
|
|
|
NULL, |
|
|
|
|
fmstate->temp_cxt); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The returning slot will not necessarily be suitable to store |
|
|
|
|
* heaptuples directly, so allow for conversion. |
|
|
|
|
*/ |
|
|
|
|
ExecForceStoreHeapTuple(newtup, slot, true); |
|
|
|
|
/*
|
|
|
|
|
* The returning slot will not necessarily be suitable to store heaptuples |
|
|
|
|
* directly, so allow for conversion. |
|
|
|
|
*/ |
|
|
|
|
ExecForceStoreHeapTuple(newtup, slot, true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -4921,14 +4921,14 @@ postgresAnalyzeForeignTable(Relation relation, |
|
|
|
|
initStringInfo(&sql); |
|
|
|
|
deparseAnalyzeSizeSql(&sql, relation); |
|
|
|
|
|
|
|
|
|
res = pgfdw_exec_query(conn, sql.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
res = pgfdw_exec_query(conn, sql.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
|
|
|
|
|
if (PQntuples(res) != 1 || PQnfields(res) != 1) |
|
|
|
|
elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); |
|
|
|
|
*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); |
|
|
|
|
PQclear(res); |
|
|
|
|
if (PQntuples(res) != 1 || PQnfields(res) != 1) |
|
|
|
|
elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); |
|
|
|
|
*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); |
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
ReleaseConnection(conn); |
|
|
|
|
|
|
|
|
|
@ -4970,15 +4970,15 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample) |
|
|
|
|
initStringInfo(&sql); |
|
|
|
|
deparseAnalyzeInfoSql(&sql, relation); |
|
|
|
|
|
|
|
|
|
res = pgfdw_exec_query(conn, sql.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
res = pgfdw_exec_query(conn, sql.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
|
|
|
|
|
if (PQntuples(res) != 1 || PQnfields(res) != 2) |
|
|
|
|
elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query"); |
|
|
|
|
reltuples = strtod(PQgetvalue(res, 0, 0), NULL); |
|
|
|
|
relkind = *(PQgetvalue(res, 0, 1)); |
|
|
|
|
PQclear(res); |
|
|
|
|
if (PQntuples(res) != 1 || PQnfields(res) != 2) |
|
|
|
|
elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query"); |
|
|
|
|
reltuples = strtod(PQgetvalue(res, 0, 0), NULL); |
|
|
|
|
relkind = *(PQgetvalue(res, 0, 1)); |
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
ReleaseConnection(conn); |
|
|
|
|
|
|
|
|
|
@ -5200,76 +5200,76 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, |
|
|
|
|
|
|
|
|
|
deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs); |
|
|
|
|
|
|
|
|
|
res = pgfdw_exec_query(conn, sql.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
PQclear(res); |
|
|
|
|
res = pgfdw_exec_query(conn, sql.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_COMMAND_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Determine the fetch size. The default is arbitrary, but shouldn't |
|
|
|
|
* be enormous. |
|
|
|
|
*/ |
|
|
|
|
fetch_size = 100; |
|
|
|
|
foreach(lc, server->options) |
|
|
|
|
{ |
|
|
|
|
DefElem *def = (DefElem *) lfirst(lc); |
|
|
|
|
/*
|
|
|
|
|
* Determine the fetch size. The default is arbitrary, but shouldn't be |
|
|
|
|
* enormous. |
|
|
|
|
*/ |
|
|
|
|
fetch_size = 100; |
|
|
|
|
foreach(lc, server->options) |
|
|
|
|
{ |
|
|
|
|
DefElem *def = (DefElem *) lfirst(lc); |
|
|
|
|
|
|
|
|
|
if (strcmp(def->defname, "fetch_size") == 0) |
|
|
|
|
{ |
|
|
|
|
(void) parse_int(defGetString(def), &fetch_size, 0, NULL); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
foreach(lc, table->options) |
|
|
|
|
if (strcmp(def->defname, "fetch_size") == 0) |
|
|
|
|
{ |
|
|
|
|
DefElem *def = (DefElem *) lfirst(lc); |
|
|
|
|
(void) parse_int(defGetString(def), &fetch_size, 0, NULL); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
foreach(lc, table->options) |
|
|
|
|
{ |
|
|
|
|
DefElem *def = (DefElem *) lfirst(lc); |
|
|
|
|
|
|
|
|
|
if (strcmp(def->defname, "fetch_size") == 0) |
|
|
|
|
{ |
|
|
|
|
(void) parse_int(defGetString(def), &fetch_size, 0, NULL); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
if (strcmp(def->defname, "fetch_size") == 0) |
|
|
|
|
{ |
|
|
|
|
(void) parse_int(defGetString(def), &fetch_size, 0, NULL); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Construct command to fetch rows from remote. */ |
|
|
|
|
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", |
|
|
|
|
fetch_size, cursor_number); |
|
|
|
|
/* Construct command to fetch rows from remote. */ |
|
|
|
|
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", |
|
|
|
|
fetch_size, cursor_number); |
|
|
|
|
|
|
|
|
|
/* Retrieve and process rows a batch at a time. */ |
|
|
|
|
for (;;) |
|
|
|
|
{ |
|
|
|
|
int numrows; |
|
|
|
|
int i; |
|
|
|
|
/* Retrieve and process rows a batch at a time. */ |
|
|
|
|
for (;;) |
|
|
|
|
{ |
|
|
|
|
int numrows; |
|
|
|
|
int i; |
|
|
|
|
|
|
|
|
|
/* Allow users to cancel long query */ |
|
|
|
|
CHECK_FOR_INTERRUPTS(); |
|
|
|
|
/* Allow users to cancel long query */ |
|
|
|
|
CHECK_FOR_INTERRUPTS(); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* XXX possible future improvement: if rowstoskip is large, we |
|
|
|
|
* could issue a MOVE rather than physically fetching the rows, |
|
|
|
|
* then just adjust rowstoskip and samplerows appropriately. |
|
|
|
|
*/ |
|
|
|
|
/*
|
|
|
|
|
* XXX possible future improvement: if rowstoskip is large, we could |
|
|
|
|
* issue a MOVE rather than physically fetching the rows, then just |
|
|
|
|
* adjust rowstoskip and samplerows appropriately. |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/* Fetch some rows */ |
|
|
|
|
res = pgfdw_exec_query(conn, fetch_sql, NULL); |
|
|
|
|
/* On error, report the original query, not the FETCH. */ |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
/* Fetch some rows */ |
|
|
|
|
res = pgfdw_exec_query(conn, fetch_sql, NULL); |
|
|
|
|
/* On error, report the original query, not the FETCH. */ |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, sql.data); |
|
|
|
|
|
|
|
|
|
/* Process whatever we got. */ |
|
|
|
|
numrows = PQntuples(res); |
|
|
|
|
for (i = 0; i < numrows; i++) |
|
|
|
|
analyze_row_processor(res, i, &astate); |
|
|
|
|
/* Process whatever we got. */ |
|
|
|
|
numrows = PQntuples(res); |
|
|
|
|
for (i = 0; i < numrows; i++) |
|
|
|
|
analyze_row_processor(res, i, &astate); |
|
|
|
|
|
|
|
|
|
PQclear(res); |
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
/* Must be EOF if we didn't get all the rows requested. */ |
|
|
|
|
if (numrows < fetch_size) |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
/* Must be EOF if we didn't get all the rows requested. */ |
|
|
|
|
if (numrows < fetch_size) |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Close the cursor, just to be tidy. */ |
|
|
|
|
close_cursor(conn, cursor_number, NULL); |
|
|
|
|
/* Close the cursor, just to be tidy. */ |
|
|
|
|
close_cursor(conn, cursor_number, NULL); |
|
|
|
|
|
|
|
|
|
ReleaseConnection(conn); |
|
|
|
|
|
|
|
|
|
@ -5420,234 +5420,231 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) |
|
|
|
|
/* Create workspace for strings */ |
|
|
|
|
initStringInfo(&buf); |
|
|
|
|
|
|
|
|
|
/* Check that the schema really exists */ |
|
|
|
|
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); |
|
|
|
|
deparseStringLiteral(&buf, stmt->remote_schema); |
|
|
|
|
/* Check that the schema really exists */ |
|
|
|
|
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); |
|
|
|
|
deparseStringLiteral(&buf, stmt->remote_schema); |
|
|
|
|
|
|
|
|
|
res = pgfdw_exec_query(conn, buf.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, buf.data); |
|
|
|
|
res = pgfdw_exec_query(conn, buf.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, buf.data); |
|
|
|
|
|
|
|
|
|
if (PQntuples(res) != 1) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), |
|
|
|
|
errmsg("schema \"%s\" is not present on foreign server \"%s\"", |
|
|
|
|
stmt->remote_schema, server->servername))); |
|
|
|
|
if (PQntuples(res) != 1) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), |
|
|
|
|
errmsg("schema \"%s\" is not present on foreign server \"%s\"", |
|
|
|
|
stmt->remote_schema, server->servername))); |
|
|
|
|
|
|
|
|
|
PQclear(res); |
|
|
|
|
resetStringInfo(&buf); |
|
|
|
|
PQclear(res); |
|
|
|
|
resetStringInfo(&buf); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Fetch all table data from this schema, possibly restricted by |
|
|
|
|
* EXCEPT or LIMIT TO. (We don't actually need to pay any attention |
|
|
|
|
* to EXCEPT/LIMIT TO here, because the core code will filter the |
|
|
|
|
* statements we return according to those lists anyway. But it |
|
|
|
|
* should save a few cycles to not process excluded tables in the |
|
|
|
|
* first place.) |
|
|
|
|
* |
|
|
|
|
* Import table data for partitions only when they are explicitly |
|
|
|
|
* specified in LIMIT TO clause. Otherwise ignore them and only |
|
|
|
|
* include the definitions of the root partitioned tables to allow |
|
|
|
|
* access to the complete remote data set locally in the schema |
|
|
|
|
* imported. |
|
|
|
|
* |
|
|
|
|
* Note: because we run the connection with search_path restricted to |
|
|
|
|
* pg_catalog, the format_type() and pg_get_expr() outputs will always |
|
|
|
|
* include a schema name for types/functions in other schemas, which |
|
|
|
|
* is what we want. |
|
|
|
|
*/ |
|
|
|
|
/*
|
|
|
|
|
* Fetch all table data from this schema, possibly restricted by EXCEPT or |
|
|
|
|
* LIMIT TO. (We don't actually need to pay any attention to EXCEPT/LIMIT |
|
|
|
|
* TO here, because the core code will filter the statements we return |
|
|
|
|
* according to those lists anyway. But it should save a few cycles to |
|
|
|
|
* not process excluded tables in the first place.) |
|
|
|
|
* |
|
|
|
|
* Import table data for partitions only when they are explicitly |
|
|
|
|
* specified in LIMIT TO clause. Otherwise ignore them and only include |
|
|
|
|
* the definitions of the root partitioned tables to allow access to the |
|
|
|
|
* complete remote data set locally in the schema imported. |
|
|
|
|
* |
|
|
|
|
* Note: because we run the connection with search_path restricted to |
|
|
|
|
* pg_catalog, the format_type() and pg_get_expr() outputs will always |
|
|
|
|
* include a schema name for types/functions in other schemas, which is |
|
|
|
|
* what we want. |
|
|
|
|
*/ |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
"SELECT relname, " |
|
|
|
|
" attname, " |
|
|
|
|
" format_type(atttypid, atttypmod), " |
|
|
|
|
" attnotnull, " |
|
|
|
|
" pg_get_expr(adbin, adrelid), "); |
|
|
|
|
|
|
|
|
|
/* Generated columns are supported since Postgres 12 */ |
|
|
|
|
if (PQserverVersion(conn) >= 120000) |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
"SELECT relname, " |
|
|
|
|
" attname, " |
|
|
|
|
" format_type(atttypid, atttypmod), " |
|
|
|
|
" attnotnull, " |
|
|
|
|
" pg_get_expr(adbin, adrelid), "); |
|
|
|
|
|
|
|
|
|
/* Generated columns are supported since Postgres 12 */ |
|
|
|
|
if (PQserverVersion(conn) >= 120000) |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
" attgenerated, "); |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
" NULL, "); |
|
|
|
|
|
|
|
|
|
if (import_collate) |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
" collname, " |
|
|
|
|
" collnsp.nspname "); |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
" NULL, NULL "); |
|
|
|
|
|
|
|
|
|
" attgenerated, "); |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
"FROM pg_class c " |
|
|
|
|
" JOIN pg_namespace n ON " |
|
|
|
|
" relnamespace = n.oid " |
|
|
|
|
" LEFT JOIN pg_attribute a ON " |
|
|
|
|
" attrelid = c.oid AND attnum > 0 " |
|
|
|
|
" AND NOT attisdropped " |
|
|
|
|
" LEFT JOIN pg_attrdef ad ON " |
|
|
|
|
" adrelid = c.oid AND adnum = attnum "); |
|
|
|
|
|
|
|
|
|
if (import_collate) |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
" LEFT JOIN pg_collation coll ON " |
|
|
|
|
" coll.oid = attcollation " |
|
|
|
|
" LEFT JOIN pg_namespace collnsp ON " |
|
|
|
|
" collnsp.oid = collnamespace "); |
|
|
|
|
" NULL, "); |
|
|
|
|
|
|
|
|
|
if (import_collate) |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
"WHERE c.relkind IN (" |
|
|
|
|
CppAsString2(RELKIND_RELATION) "," |
|
|
|
|
CppAsString2(RELKIND_VIEW) "," |
|
|
|
|
CppAsString2(RELKIND_FOREIGN_TABLE) "," |
|
|
|
|
CppAsString2(RELKIND_MATVIEW) "," |
|
|
|
|
CppAsString2(RELKIND_PARTITIONED_TABLE) ") " |
|
|
|
|
" AND n.nspname = "); |
|
|
|
|
deparseStringLiteral(&buf, stmt->remote_schema); |
|
|
|
|
" collname, " |
|
|
|
|
" collnsp.nspname "); |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
" NULL, NULL "); |
|
|
|
|
|
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
"FROM pg_class c " |
|
|
|
|
" JOIN pg_namespace n ON " |
|
|
|
|
" relnamespace = n.oid " |
|
|
|
|
" LEFT JOIN pg_attribute a ON " |
|
|
|
|
" attrelid = c.oid AND attnum > 0 " |
|
|
|
|
" AND NOT attisdropped " |
|
|
|
|
" LEFT JOIN pg_attrdef ad ON " |
|
|
|
|
" adrelid = c.oid AND adnum = attnum "); |
|
|
|
|
|
|
|
|
|
if (import_collate) |
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
" LEFT JOIN pg_collation coll ON " |
|
|
|
|
" coll.oid = attcollation " |
|
|
|
|
" LEFT JOIN pg_namespace collnsp ON " |
|
|
|
|
" collnsp.oid = collnamespace "); |
|
|
|
|
|
|
|
|
|
appendStringInfoString(&buf, |
|
|
|
|
"WHERE c.relkind IN (" |
|
|
|
|
CppAsString2(RELKIND_RELATION) "," |
|
|
|
|
CppAsString2(RELKIND_VIEW) "," |
|
|
|
|
CppAsString2(RELKIND_FOREIGN_TABLE) "," |
|
|
|
|
CppAsString2(RELKIND_MATVIEW) "," |
|
|
|
|
CppAsString2(RELKIND_PARTITIONED_TABLE) ") " |
|
|
|
|
" AND n.nspname = "); |
|
|
|
|
deparseStringLiteral(&buf, stmt->remote_schema); |
|
|
|
|
|
|
|
|
|
/* Partitions are supported since Postgres 10 */ |
|
|
|
|
if (PQserverVersion(conn) >= 100000 && |
|
|
|
|
stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO) |
|
|
|
|
appendStringInfoString(&buf, " AND NOT c.relispartition "); |
|
|
|
|
|
|
|
|
|
/* Apply restrictions for LIMIT TO and EXCEPT */ |
|
|
|
|
if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || |
|
|
|
|
stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) |
|
|
|
|
{ |
|
|
|
|
bool first_item = true; |
|
|
|
|
|
|
|
|
|
/* Partitions are supported since Postgres 10 */ |
|
|
|
|
if (PQserverVersion(conn) >= 100000 && |
|
|
|
|
stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO) |
|
|
|
|
appendStringInfoString(&buf, " AND NOT c.relispartition "); |
|
|
|
|
appendStringInfoString(&buf, " AND c.relname "); |
|
|
|
|
if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) |
|
|
|
|
appendStringInfoString(&buf, "NOT "); |
|
|
|
|
appendStringInfoString(&buf, "IN ("); |
|
|
|
|
|
|
|
|
|
/* Apply restrictions for LIMIT TO and EXCEPT */ |
|
|
|
|
if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || |
|
|
|
|
stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) |
|
|
|
|
/* Append list of table names within IN clause */ |
|
|
|
|
foreach(lc, stmt->table_list) |
|
|
|
|
{ |
|
|
|
|
bool first_item = true; |
|
|
|
|
RangeVar *rv = (RangeVar *) lfirst(lc); |
|
|
|
|
|
|
|
|
|
appendStringInfoString(&buf, " AND c.relname "); |
|
|
|
|
if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) |
|
|
|
|
appendStringInfoString(&buf, "NOT "); |
|
|
|
|
appendStringInfoString(&buf, "IN ("); |
|
|
|
|
if (first_item) |
|
|
|
|
first_item = false; |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, ", "); |
|
|
|
|
deparseStringLiteral(&buf, rv->relname); |
|
|
|
|
} |
|
|
|
|
appendStringInfoChar(&buf, ')'); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Append list of table names within IN clause */ |
|
|
|
|
foreach(lc, stmt->table_list) |
|
|
|
|
{ |
|
|
|
|
RangeVar *rv = (RangeVar *) lfirst(lc); |
|
|
|
|
/* Append ORDER BY at the end of query to ensure output ordering */ |
|
|
|
|
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); |
|
|
|
|
|
|
|
|
|
if (first_item) |
|
|
|
|
first_item = false; |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, ", "); |
|
|
|
|
deparseStringLiteral(&buf, rv->relname); |
|
|
|
|
} |
|
|
|
|
appendStringInfoChar(&buf, ')'); |
|
|
|
|
} |
|
|
|
|
/* Fetch the data */ |
|
|
|
|
res = pgfdw_exec_query(conn, buf.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, buf.data); |
|
|
|
|
|
|
|
|
|
/* Append ORDER BY at the end of query to ensure output ordering */ |
|
|
|
|
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); |
|
|
|
|
/* Process results */ |
|
|
|
|
numrows = PQntuples(res); |
|
|
|
|
/* note: incrementation of i happens in inner loop's while() test */ |
|
|
|
|
for (i = 0; i < numrows;) |
|
|
|
|
{ |
|
|
|
|
char *tablename = PQgetvalue(res, i, 0); |
|
|
|
|
bool first_item = true; |
|
|
|
|
|
|
|
|
|
/* Fetch the data */ |
|
|
|
|
res = pgfdw_exec_query(conn, buf.data, NULL); |
|
|
|
|
if (PQresultStatus(res) != PGRES_TUPLES_OK) |
|
|
|
|
pgfdw_report_error(ERROR, res, conn, buf.data); |
|
|
|
|
resetStringInfo(&buf); |
|
|
|
|
appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", |
|
|
|
|
quote_identifier(tablename)); |
|
|
|
|
|
|
|
|
|
/* Process results */ |
|
|
|
|
numrows = PQntuples(res); |
|
|
|
|
/* note: incrementation of i happens in inner loop's while() test */ |
|
|
|
|
for (i = 0; i < numrows;) |
|
|
|
|
/* Scan all rows for this table */ |
|
|
|
|
do |
|
|
|
|
{ |
|
|
|
|
char *tablename = PQgetvalue(res, i, 0); |
|
|
|
|
bool first_item = true; |
|
|
|
|
char *attname; |
|
|
|
|
char *typename; |
|
|
|
|
char *attnotnull; |
|
|
|
|
char *attgenerated; |
|
|
|
|
char *attdefault; |
|
|
|
|
char *collname; |
|
|
|
|
char *collnamespace; |
|
|
|
|
|
|
|
|
|
/* If table has no columns, we'll see nulls here */ |
|
|
|
|
if (PQgetisnull(res, i, 1)) |
|
|
|
|
continue; |
|
|
|
|
|
|
|
|
|
resetStringInfo(&buf); |
|
|
|
|
appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", |
|
|
|
|
quote_identifier(tablename)); |
|
|
|
|
attname = PQgetvalue(res, i, 1); |
|
|
|
|
typename = PQgetvalue(res, i, 2); |
|
|
|
|
attnotnull = PQgetvalue(res, i, 3); |
|
|
|
|
attdefault = PQgetisnull(res, i, 4) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 4); |
|
|
|
|
attgenerated = PQgetisnull(res, i, 5) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 5); |
|
|
|
|
collname = PQgetisnull(res, i, 6) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 6); |
|
|
|
|
collnamespace = PQgetisnull(res, i, 7) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 7); |
|
|
|
|
|
|
|
|
|
if (first_item) |
|
|
|
|
first_item = false; |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, ",\n"); |
|
|
|
|
|
|
|
|
|
/* Scan all rows for this table */ |
|
|
|
|
do |
|
|
|
|
{ |
|
|
|
|
char *attname; |
|
|
|
|
char *typename; |
|
|
|
|
char *attnotnull; |
|
|
|
|
char *attgenerated; |
|
|
|
|
char *attdefault; |
|
|
|
|
char *collname; |
|
|
|
|
char *collnamespace; |
|
|
|
|
|
|
|
|
|
/* If table has no columns, we'll see nulls here */ |
|
|
|
|
if (PQgetisnull(res, i, 1)) |
|
|
|
|
continue; |
|
|
|
|
/* Print column name and type */ |
|
|
|
|
appendStringInfo(&buf, " %s %s", |
|
|
|
|
quote_identifier(attname), |
|
|
|
|
typename); |
|
|
|
|
|
|
|
|
|
attname = PQgetvalue(res, i, 1); |
|
|
|
|
typename = PQgetvalue(res, i, 2); |
|
|
|
|
attnotnull = PQgetvalue(res, i, 3); |
|
|
|
|
attdefault = PQgetisnull(res, i, 4) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 4); |
|
|
|
|
attgenerated = PQgetisnull(res, i, 5) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 5); |
|
|
|
|
collname = PQgetisnull(res, i, 6) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 6); |
|
|
|
|
collnamespace = PQgetisnull(res, i, 7) ? NULL : |
|
|
|
|
PQgetvalue(res, i, 7); |
|
|
|
|
|
|
|
|
|
if (first_item) |
|
|
|
|
first_item = false; |
|
|
|
|
else |
|
|
|
|
appendStringInfoString(&buf, ",\n"); |
|
|
|
|
/*
|
|
|
|
|
* Add column_name option so that renaming the foreign table's |
|
|
|
|
* column doesn't break the association to the underlying column. |
|
|
|
|
*/ |
|
|
|
|
appendStringInfoString(&buf, " OPTIONS (column_name "); |
|
|
|
|
deparseStringLiteral(&buf, attname); |
|
|
|
|
appendStringInfoChar(&buf, ')'); |
|
|
|
|
|
|
|
|
|
/* Print column name and type */ |
|
|
|
|
appendStringInfo(&buf, " %s %s", |
|
|
|
|
quote_identifier(attname), |
|
|
|
|
typename); |
|
|
|
|
/* Add COLLATE if needed */ |
|
|
|
|
if (import_collate && collname != NULL && collnamespace != NULL) |
|
|
|
|
appendStringInfo(&buf, " COLLATE %s.%s", |
|
|
|
|
quote_identifier(collnamespace), |
|
|
|
|
quote_identifier(collname)); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Add column_name option so that renaming the foreign table's |
|
|
|
|
* column doesn't break the association to the underlying |
|
|
|
|
* column. |
|
|
|
|
*/ |
|
|
|
|
appendStringInfoString(&buf, " OPTIONS (column_name "); |
|
|
|
|
deparseStringLiteral(&buf, attname); |
|
|
|
|
appendStringInfoChar(&buf, ')'); |
|
|
|
|
|
|
|
|
|
/* Add COLLATE if needed */ |
|
|
|
|
if (import_collate && collname != NULL && collnamespace != NULL) |
|
|
|
|
appendStringInfo(&buf, " COLLATE %s.%s", |
|
|
|
|
quote_identifier(collnamespace), |
|
|
|
|
quote_identifier(collname)); |
|
|
|
|
|
|
|
|
|
/* Add DEFAULT if needed */ |
|
|
|
|
if (import_default && attdefault != NULL && |
|
|
|
|
(!attgenerated || !attgenerated[0])) |
|
|
|
|
appendStringInfo(&buf, " DEFAULT %s", attdefault); |
|
|
|
|
|
|
|
|
|
/* Add GENERATED if needed */ |
|
|
|
|
if (import_generated && attgenerated != NULL && |
|
|
|
|
attgenerated[0] == ATTRIBUTE_GENERATED_STORED) |
|
|
|
|
{ |
|
|
|
|
Assert(attdefault != NULL); |
|
|
|
|
appendStringInfo(&buf, |
|
|
|
|
" GENERATED ALWAYS AS (%s) STORED", |
|
|
|
|
attdefault); |
|
|
|
|
} |
|
|
|
|
/* Add DEFAULT if needed */ |
|
|
|
|
if (import_default && attdefault != NULL && |
|
|
|
|
(!attgenerated || !attgenerated[0])) |
|
|
|
|
appendStringInfo(&buf, " DEFAULT %s", attdefault); |
|
|
|
|
|
|
|
|
|
/* Add NOT NULL if needed */ |
|
|
|
|
if (import_not_null && attnotnull[0] == 't') |
|
|
|
|
appendStringInfoString(&buf, " NOT NULL"); |
|
|
|
|
/* Add GENERATED if needed */ |
|
|
|
|
if (import_generated && attgenerated != NULL && |
|
|
|
|
attgenerated[0] == ATTRIBUTE_GENERATED_STORED) |
|
|
|
|
{ |
|
|
|
|
Assert(attdefault != NULL); |
|
|
|
|
appendStringInfo(&buf, |
|
|
|
|
" GENERATED ALWAYS AS (%s) STORED", |
|
|
|
|
attdefault); |
|
|
|
|
} |
|
|
|
|
while (++i < numrows && |
|
|
|
|
strcmp(PQgetvalue(res, i, 0), tablename) == 0); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Add server name and table-level options. We specify remote |
|
|
|
|
* schema and table name as options (the latter to ensure that |
|
|
|
|
* renaming the foreign table doesn't break the association). |
|
|
|
|
*/ |
|
|
|
|
appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", |
|
|
|
|
quote_identifier(server->servername)); |
|
|
|
|
/* Add NOT NULL if needed */ |
|
|
|
|
if (import_not_null && attnotnull[0] == 't') |
|
|
|
|
appendStringInfoString(&buf, " NOT NULL"); |
|
|
|
|
} |
|
|
|
|
while (++i < numrows && |
|
|
|
|
strcmp(PQgetvalue(res, i, 0), tablename) == 0); |
|
|
|
|
|
|
|
|
|
appendStringInfoString(&buf, "schema_name "); |
|
|
|
|
deparseStringLiteral(&buf, stmt->remote_schema); |
|
|
|
|
appendStringInfoString(&buf, ", table_name "); |
|
|
|
|
deparseStringLiteral(&buf, tablename); |
|
|
|
|
/*
|
|
|
|
|
* Add server name and table-level options. We specify remote schema |
|
|
|
|
* and table name as options (the latter to ensure that renaming the |
|
|
|
|
* foreign table doesn't break the association). |
|
|
|
|
*/ |
|
|
|
|
appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", |
|
|
|
|
quote_identifier(server->servername)); |
|
|
|
|
|
|
|
|
|
appendStringInfoString(&buf, ");"); |
|
|
|
|
appendStringInfoString(&buf, "schema_name "); |
|
|
|
|
deparseStringLiteral(&buf, stmt->remote_schema); |
|
|
|
|
appendStringInfoString(&buf, ", table_name "); |
|
|
|
|
deparseStringLiteral(&buf, tablename); |
|
|
|
|
|
|
|
|
|
commands = lappend(commands, pstrdup(buf.data)); |
|
|
|
|
} |
|
|
|
|
PQclear(res); |
|
|
|
|
appendStringInfoString(&buf, ");"); |
|
|
|
|
|
|
|
|
|
commands = lappend(commands, pstrdup(buf.data)); |
|
|
|
|
} |
|
|
|
|
PQclear(res); |
|
|
|
|
|
|
|
|
|
ReleaseConnection(conn); |
|
|
|
|
|
|
|
|
|
|