diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 56374905f5b..19debfb5c9e 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -285,6 +285,8 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate, static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); +static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, + Oid serverOid); /* * Helper functions @@ -362,6 +364,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; + /* Support functions for IMPORT FOREIGN SCHEMA */ + routine->ImportForeignSchema = postgresImportForeignSchema; + PG_RETURN_POINTER(routine); } @@ -2564,6 +2569,270 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) } /* + * Import a foreign schema + */ +static List * +postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) +{ + List *commands = NIL; + bool import_collate = true; + bool import_default = false; + bool import_not_null = true; + ForeignServer *server; + UserMapping *mapping; + PGconn *conn; + StringInfoData buf; + PGresult *volatile res = NULL; + int numrows, + i; + ListCell *lc; + + /* Parse statement options */ + foreach(lc, stmt->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "import_collate") == 0) + import_collate = defGetBoolean(def); + else if (strcmp(def->defname, "import_default") == 0) + import_default = defGetBoolean(def); + else if (strcmp(def->defname, "import_not_null") == 0) + import_not_null = defGetBoolean(def); + else + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("invalid option \"%s\"", def->defname))); + } + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + server = GetForeignServer(serverOid); + mapping = GetUserMapping(GetUserId(), server->serverid); + conn = GetConnection(server, mapping, false); + + /* Don't attempt to import collation if remote server hasn't got it */ + if (PQserverVersion(conn) < 90100) + import_collate = false; + + /* Create workspace for strings */ + initStringInfo(&buf); + + /* In what follows, do not risk leaking any PGresults. */ + PG_TRY(); + { + /* Check that the schema really exists */ + appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); + deparseStringLiteral(&buf, stmt->remote_schema); + + res = PQexec(conn, buf.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, buf.data); + + if (PQntuples(res) != 1) + ereport(ERROR, + (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), + errmsg("schema \"%s\" is not present on foreign server \"%s\"", + stmt->remote_schema, server->servername))); + + PQclear(res); + res = NULL; + resetStringInfo(&buf); + + /* + * Fetch all table data from this schema, possibly restricted by + * EXCEPT or LIMIT TO. + * + * Note: because we run the connection with search_path restricted to + * pg_catalog, the format_type() and pg_get_expr() outputs will always + * include a schema name for types/functions in other schemas, which + * is what we want. + */ + if (import_collate) + appendStringInfoString(&buf, + "SELECT relname, " + " attname, " + " format_type(atttypid, atttypmod), " + " attnotnull, " + " pg_get_expr(adbin, adrelid), " + " collname, " + " collnsp.nspname " + "FROM pg_class c " + " JOIN pg_namespace n ON " + " relnamespace = n.oid " + " LEFT JOIN pg_attribute a ON " + " attrelid = c.oid AND attnum > 0 " + " AND NOT attisdropped " + " LEFT JOIN pg_attrdef ad ON " + " adrelid = c.oid AND adnum = attnum " + " LEFT JOIN pg_collation coll ON " + " coll.oid = attcollation " + " LEFT JOIN pg_namespace collnsp ON " + " collnsp.oid = collnamespace "); + else + appendStringInfoString(&buf, + "SELECT relname, " + " attname, " + " format_type(atttypid, atttypmod), " + " attnotnull, " + " pg_get_expr(adbin, adrelid), " + " NULL, NULL " + "FROM pg_class c " + " JOIN pg_namespace n ON " + " relnamespace = n.oid " + " LEFT JOIN pg_attribute a ON " + " attrelid = c.oid AND attnum > 0 " + " AND NOT attisdropped " + " LEFT JOIN pg_attrdef ad ON " + " adrelid = c.oid AND adnum = attnum "); + + appendStringInfoString(&buf, + "WHERE c.relkind IN ('r', 'v', 'f', 'm') " + " AND n.nspname = "); + deparseStringLiteral(&buf, stmt->remote_schema); + + /* Apply restrictions for LIMIT TO and EXCEPT */ + if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || + stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + { + bool first_item = true; + + appendStringInfoString(&buf, " AND c.relname "); + if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + appendStringInfoString(&buf, "NOT "); + appendStringInfoString(&buf, "IN ("); + + /* Append list of table names within IN clause */ + foreach(lc, stmt->table_list) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ", "); + deparseStringLiteral(&buf, rv->relname); + } + appendStringInfoString(&buf, ")"); + } + + /* Append ORDER BY at the end of query to ensure output ordering */ + appendStringInfo(&buf, " ORDER BY c.relname, a.attnum"); + + /* Fetch the data */ + res = PQexec(conn, buf.data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, buf.data); + + /* Process results */ + numrows = PQntuples(res); + /* note: incrementation of i happens in inner loop's while() test */ + for (i = 0; i < numrows;) + { + char *tablename = PQgetvalue(res, i, 0); + bool first_item = true; + + resetStringInfo(&buf); + appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", + quote_identifier(tablename)); + + /* Scan all rows for this table */ + do + { + char *attname; + char *typename; + char *attnotnull; + char *attdefault; + char *collname; + char *collnamespace; + + /* If table has no columns, we'll see nulls here */ + if (PQgetisnull(res, i, 1)) + continue; + + attname = PQgetvalue(res, i, 1); + typename = PQgetvalue(res, i, 2); + attnotnull = PQgetvalue(res, i, 3); + attdefault = PQgetisnull(res, i, 4) ? (char *) NULL : + PQgetvalue(res, i, 4); + collname = PQgetisnull(res, i, 5) ? (char *) NULL : + PQgetvalue(res, i, 5); + collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL : + PQgetvalue(res, i, 6); + + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ",\n"); + + /* Print column name and type */ + appendStringInfo(&buf, " %s %s", + quote_identifier(attname), + typename); + + /* + * Add column_name option so that renaming the foreign table's + * column doesn't break the association to the underlying + * column. + */ + appendStringInfoString(&buf, " OPTIONS (column_name "); + deparseStringLiteral(&buf, attname); + appendStringInfoString(&buf, ")"); + + /* Add COLLATE if needed */ + if (import_collate && collname != NULL && collnamespace != NULL) + appendStringInfo(&buf, " COLLATE %s.%s", + quote_identifier(collnamespace), + quote_identifier(collname)); + + /* Add DEFAULT if needed */ + if (import_default && attdefault != NULL) + appendStringInfo(&buf, " DEFAULT %s", attdefault); + + /* Add NOT NULL if needed */ + if (import_not_null && attnotnull[0] == 't') + appendStringInfoString(&buf, " NOT NULL"); + } + while (++i < numrows && + strcmp(PQgetvalue(res, i, 0), tablename) == 0); + + /* + * Add server name and table-level options. We specify remote + * schema and table name as options (the latter to ensure that + * renaming the foreign table doesn't break the association). + */ + appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", + quote_identifier(server->servername)); + + appendStringInfoString(&buf, "schema_name "); + deparseStringLiteral(&buf, stmt->remote_schema); + appendStringInfoString(&buf, ", table_name "); + deparseStringLiteral(&buf, tablename); + + appendStringInfoString(&buf, ");"); + + commands = lappend(commands, pstrdup(buf.data)); + } + + /* Clean up */ + PQclear(res); + res = NULL; + } + PG_CATCH(); + { + if (res) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); + + ReleaseConnection(conn); + + return commands; +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is |