diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 155 |
1 files changed, 88 insertions, 67 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 982a8d9a61f..687b87b8604 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -84,9 +84,10 @@ typedef struct PgFdwRelationInfo * Indexes of FDW-private information stored in fdw_private lists. * * We store various information in ForeignScan.fdw_private to pass it from - * planner to executor. Currently there is just: + * planner to executor. Currently we store: * * 1) SELECT statement text to be sent to the remote server + * 2) Integer list of attribute numbers retrieved by the SELECT * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT statement: @@ -95,7 +96,9 @@ typedef struct PgFdwRelationInfo enum FdwScanPrivateIndex { /* SQL statement to execute remotely (as a String node) */ - FdwScanPrivateSelectSql + FdwScanPrivateSelectSql, + /* Integer list of attribute numbers retrieved by the SELECT */ + FdwScanPrivateRetrievedAttrs }; /* @@ -106,6 +109,7 @@ enum FdwScanPrivateIndex * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) * 3) Boolean flag showing if there's a RETURNING clause + * 4) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { @@ -114,7 +118,9 @@ enum FdwModifyPrivateIndex /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, /* has-returning flag (as an integer Value node) */ - FdwModifyPrivateHasReturning + FdwModifyPrivateHasReturning, + /* Integer list of attribute numbers retrieved by RETURNING */ + FdwModifyPrivateRetrievedAttrs }; /* @@ -125,7 +131,9 @@ typedef struct PgFdwScanState Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ - List *fdw_private; /* FDW-private information from planner */ + /* extracted fdw_private data */ + char *query; /* text of SELECT command */ + List *retrieved_attrs; /* list of retrieved attribute numbers */ /* for remote query execution */ PGconn *conn; /* connection for the scan */ @@ -166,6 +174,7 @@ typedef struct PgFdwModifyState char *query; /* text of INSERT/UPDATE/DELETE command */ List *target_attrs; /* list of target attribute numbers */ bool has_returning; /* is there a RETURNING clause? */ + List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ @@ -183,6 +192,7 @@ typedef struct PgFdwAnalyzeState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + List *retrieved_attrs; /* attr numbers retrieved by query */ /* collected sample rows */ HeapTuple *rows; /* array of size targrows */ @@ -314,6 +324,7 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, + List *retrieved_attrs, MemoryContext temp_context); static void conversion_error_callback(void *arg); @@ -728,6 +739,7 @@ postgresGetForeignPlan(PlannerInfo *root, List *remote_conds = NIL; List *local_exprs = NIL; List *params_list = NIL; + List *retrieved_attrs; StringInfoData sql; ListCell *lc; @@ -777,7 +789,8 @@ postgresGetForeignPlan(PlannerInfo *root, * expressions to be sent as parameters. */ initStringInfo(&sql); - deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used); + deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used, + &retrieved_attrs); if (remote_conds) appendWhereClause(&sql, root, baserel, remote_conds, true, ¶ms_list); @@ -829,7 +842,8 @@ postgresGetForeignPlan(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwScanPrivateIndex, above. */ - fdw_private = list_make1(makeString(sql.data)); + fdw_private = list_make2(makeString(sql.data), + retrieved_attrs); /* * Create the ForeignScan node from target list, local filtering @@ -901,7 +915,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->cursor_exists = false; /* Get private info created by planner functions. */ - fsstate->fdw_private = fsplan->fdw_private; + fsstate->query = strVal(list_nth(fsplan->fdw_private, + FdwScanPrivateSelectSql)); + fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwScanPrivateRetrievedAttrs); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, @@ -915,7 +932,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); - /* Get info we'll need for data conversion. */ + /* Get info we'll need for input data conversion. */ fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); /* Prepare for output conversion of parameters used in remote query. */ @@ -1138,6 +1155,7 @@ postgresPlanForeignModify(PlannerInfo *root, StringInfoData sql; List *targetAttrs = NIL; List *returningList = NIL; + List *retrieved_attrs = NIL; initStringInfo(&sql); @@ -1194,15 +1212,18 @@ postgresPlanForeignModify(PlannerInfo *root, { case CMD_INSERT: deparseInsertSql(&sql, root, resultRelation, rel, - targetAttrs, returningList); + targetAttrs, returningList, + &retrieved_attrs); break; case CMD_UPDATE: deparseUpdateSql(&sql, root, resultRelation, rel, - targetAttrs, returningList); + targetAttrs, returningList, + &retrieved_attrs); break; case CMD_DELETE: deparseDeleteSql(&sql, root, resultRelation, rel, - returningList); + returningList, + &retrieved_attrs); break; default: elog(ERROR, "unexpected operation: %d", (int) operation); @@ -1215,9 +1236,10 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make3(makeString(sql.data), + return list_make4(makeString(sql.data), targetAttrs, - makeInteger((returningList != NIL))); + makeInteger((returningList != NIL)), + retrieved_attrs); } /* @@ -1279,6 +1301,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, FdwModifyPrivateTargetAttnums); fmstate->has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); + fmstate->retrieved_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateRetrievedAttrs); /* Create context for per-tuple temp workspace. */ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, @@ -1641,6 +1665,7 @@ estimate_path_cost_size(PlannerInfo *root, if (fpinfo->use_remote_estimate) { StringInfoData sql; + List *retrieved_attrs; PGconn *conn; /* @@ -1650,7 +1675,8 @@ estimate_path_cost_size(PlannerInfo *root, */ initStringInfo(&sql); appendStringInfoString(&sql, "EXPLAIN "); - deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used); + deparseSelectSql(&sql, root, baserel, fpinfo->attrs_used, + &retrieved_attrs); if (fpinfo->remote_conds) appendWhereClause(&sql, root, baserel, fpinfo->remote_conds, true, NULL); @@ -1819,7 +1845,6 @@ create_cursor(ForeignScanState *node) int numParams = fsstate->numParams; const char **values = fsstate->param_values; PGconn *conn = fsstate->conn; - char *sql; StringInfoData buf; PGresult *res; @@ -1867,10 +1892,9 @@ create_cursor(ForeignScanState *node) } /* Construct the DECLARE CURSOR command */ - sql = strVal(list_nth(fsstate->fdw_private, FdwScanPrivateSelectSql)); initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", - fsstate->cursor_number, sql); + fsstate->cursor_number, fsstate->query); /* * Notice that we pass NULL for paramTypes, thus forcing the remote server @@ -1885,7 +1909,7 @@ create_cursor(ForeignScanState *node) res = PQexecParams(conn, buf.data, numParams, NULL, values, NULL, NULL, 0); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, true, sql); + pgfdw_report_error(ERROR, res, true, fsstate->query); PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ @@ -1936,9 +1960,7 @@ fetch_more_data(ForeignScanState *node) res = PQexec(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, false, - strVal(list_nth(fsstate->fdw_private, - FdwScanPrivateSelectSql))); + pgfdw_report_error(ERROR, res, false, fsstate->query); /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -1952,6 +1974,7 @@ fetch_more_data(ForeignScanState *node) make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, + fsstate->retrieved_attrs, fsstate->temp_cxt); } @@ -2170,6 +2193,7 @@ store_returning_result(PgFdwModifyState *fmstate, newtup = make_tuple_from_result_row(res, 0, fmstate->rel, fmstate->attinmeta, + fmstate->retrieved_attrs, fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); @@ -2316,7 +2340,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, cursor_number = GetCursorNumber(conn); initStringInfo(&sql); appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); - deparseAnalyzeSql(&sql, relation); + deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs); /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); @@ -2461,6 +2485,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) astate->rows[pos] = make_tuple_from_result_row(res, row, astate->rel, astate->attinmeta, + astate->retrieved_attrs, astate->temp_cxt); MemoryContextSwitchTo(oldcontext); @@ -2471,26 +2496,27 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is - * conversion data for the rel's tupdesc, and temp_context is a working - * context that can be reset after each tuple. + * conversion data for the rel's tupdesc, and retrieved_attrs is an + * integer list of the table column numbers present in the PGresult. + * temp_context is a working context that can be reset after each tuple. */ static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, AttInMetadata *attinmeta, + List *retrieved_attrs, MemoryContext temp_context) { HeapTuple tuple; TupleDesc tupdesc = RelationGetDescr(rel); - Form_pg_attribute *attrs = tupdesc->attrs; Datum *values; bool *nulls; ItemPointer ctid = NULL; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; - int i; + ListCell *lc; int j; Assert(row < PQntuples(res)); @@ -2502,8 +2528,10 @@ make_tuple_from_result_row(PGresult *res, */ oldcontext = MemoryContextSwitchTo(temp_context); - values = (Datum *) palloc(tupdesc->natts * sizeof(Datum)); + values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); + /* Initialize to nulls for any columns not present in result */ + memset(nulls, true, tupdesc->natts * sizeof(bool)); /* * Set up and install callback to report where conversion error occurs. @@ -2517,63 +2545,56 @@ make_tuple_from_result_row(PGresult *res, /* * i indexes columns in the relation, j indexes columns in the PGresult. - * We assume dropped columns are not represented in the PGresult. */ - for (i = 0, j = 0; i < tupdesc->natts; i++) + j = 0; + foreach(lc, retrieved_attrs) { + int i = lfirst_int(lc); char *valstr; - /* skip dropped columns. */ - if (attrs[i]->attisdropped) - { - values[i] = (Datum) 0; - nulls[i] = true; - continue; - } - - /* convert value to internal representation */ + /* fetch next column's textual value */ if (PQgetisnull(res, row, j)) - { valstr = NULL; - nulls[i] = true; - } else - { valstr = PQgetvalue(res, row, j); - nulls[i] = false; - } - - /* Note: apply the input function even to nulls, to support domains */ - errpos.cur_attno = i + 1; - values[i] = InputFunctionCall(&attinmeta->attinfuncs[i], - valstr, - attinmeta->attioparams[i], - attinmeta->atttypmods[i]); - errpos.cur_attno = 0; - j++; - } + /* convert value to internal representation */ + if (i > 0) + { + /* ordinary column */ + Assert(i <= tupdesc->natts); + nulls[i - 1] = (valstr == NULL); + /* Apply the input function even to nulls, to support domains */ + errpos.cur_attno = i; + values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], + valstr, + attinmeta->attioparams[i - 1], + attinmeta->atttypmods[i - 1]); + errpos.cur_attno = 0; + } + else if (i == SelfItemPointerAttributeNumber) + { + /* ctid --- note we ignore any other system column in result */ + if (valstr != NULL) + { + Datum datum; - /* - * Convert ctid if present. XXX we could stand to have a cleaner way of - * detecting whether ctid is included in the result. - */ - if (j < PQnfields(res)) - { - char *valstr; - Datum datum; + datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); + ctid = (ItemPointer) DatumGetPointer(datum); + } + } - valstr = PQgetvalue(res, row, j); - datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); - ctid = (ItemPointer) DatumGetPointer(datum); j++; } /* Uninstall error context callback. */ error_context_stack = errcallback.previous; - /* check result and tuple descriptor have the same number of columns */ - if (j != PQnfields(res)) + /* + * Check we got the expected number of columns. Note: j == 0 and + * PQnfields == 1 is expected, since deparse emits a NULL if no columns. + */ + if (j > 0 && j != PQnfields(res)) elog(ERROR, "remote query result does not match the foreign table"); /* |