aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2014-07-10 15:01:31 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2014-07-10 15:01:43 -0400
commit59efda3e50ca4de6a9d5aa4491464e22b6329b1e (patch)
tree23a2fe16ebc11ad9f95ef3f70c563084d6ca4007 /contrib/postgres_fdw/postgres_fdw.c
parent6a605cd6bd9f689b35676623add0de9b90978bf1 (diff)
downloadpostgresql-59efda3e50ca4de6a9d5aa4491464e22b6329b1e.tar.gz
postgresql-59efda3e50ca4de6a9d5aa4491464e22b6329b1e.zip
Implement IMPORT FOREIGN SCHEMA.
This command provides an automated way to create foreign table definitions that match remote tables, thereby reducing tedium and chances for error. In this patch, we provide the necessary core-server infrastructure and implement the feature fully in the postgres_fdw foreign-data wrapper. Other wrappers will throw a "feature not supported" error until/unless they are updated. Ronan Dunklau and Michael Paquier, additional work by me
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c269
1 files changed, 269 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 56374905f5b..19debfb5c9e 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -285,6 +285,8 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate,
static bool postgresAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages);
+static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
+ Oid serverOid);
/*
* Helper functions
@@ -362,6 +364,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for ANALYZE */
routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
+ /* Support functions for IMPORT FOREIGN SCHEMA */
+ routine->ImportForeignSchema = postgresImportForeignSchema;
+
PG_RETURN_POINTER(routine);
}
@@ -2564,6 +2569,270 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
}
/*
+ * Import a foreign schema
+ */
+static List *
+postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
+{
+ List *commands = NIL;
+ bool import_collate = true;
+ bool import_default = false;
+ bool import_not_null = true;
+ ForeignServer *server;
+ UserMapping *mapping;
+ PGconn *conn;
+ StringInfoData buf;
+ PGresult *volatile res = NULL;
+ int numrows,
+ i;
+ ListCell *lc;
+
+ /* Parse statement options */
+ foreach(lc, stmt->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "import_collate") == 0)
+ import_collate = defGetBoolean(def);
+ else if (strcmp(def->defname, "import_default") == 0)
+ import_default = defGetBoolean(def);
+ else if (strcmp(def->defname, "import_not_null") == 0)
+ import_not_null = defGetBoolean(def);
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
+ errmsg("invalid option \"%s\"", def->defname)));
+ }
+
+ /*
+ * Get connection to the foreign server. Connection manager will
+ * establish new connection if necessary.
+ */
+ server = GetForeignServer(serverOid);
+ mapping = GetUserMapping(GetUserId(), server->serverid);
+ conn = GetConnection(server, mapping, false);
+
+ /* Don't attempt to import collation if remote server hasn't got it */
+ if (PQserverVersion(conn) < 90100)
+ import_collate = false;
+
+ /* Create workspace for strings */
+ initStringInfo(&buf);
+
+ /* In what follows, do not risk leaking any PGresults. */
+ PG_TRY();
+ {
+ /* Check that the schema really exists */
+ appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
+ deparseStringLiteral(&buf, stmt->remote_schema);
+
+ res = PQexec(conn, buf.data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, buf.data);
+
+ if (PQntuples(res) != 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
+ errmsg("schema \"%s\" is not present on foreign server \"%s\"",
+ stmt->remote_schema, server->servername)));
+
+ PQclear(res);
+ res = NULL;
+ resetStringInfo(&buf);
+
+ /*
+ * Fetch all table data from this schema, possibly restricted by
+ * EXCEPT or LIMIT TO.
+ *
+ * Note: because we run the connection with search_path restricted to
+ * pg_catalog, the format_type() and pg_get_expr() outputs will always
+ * include a schema name for types/functions in other schemas, which
+ * is what we want.
+ */
+ if (import_collate)
+ appendStringInfoString(&buf,
+ "SELECT relname, "
+ " attname, "
+ " format_type(atttypid, atttypmod), "
+ " attnotnull, "
+ " pg_get_expr(adbin, adrelid), "
+ " collname, "
+ " collnsp.nspname "
+ "FROM pg_class c "
+ " JOIN pg_namespace n ON "
+ " relnamespace = n.oid "
+ " LEFT JOIN pg_attribute a ON "
+ " attrelid = c.oid AND attnum > 0 "
+ " AND NOT attisdropped "
+ " LEFT JOIN pg_attrdef ad ON "
+ " adrelid = c.oid AND adnum = attnum "
+ " LEFT JOIN pg_collation coll ON "
+ " coll.oid = attcollation "
+ " LEFT JOIN pg_namespace collnsp ON "
+ " collnsp.oid = collnamespace ");
+ else
+ appendStringInfoString(&buf,
+ "SELECT relname, "
+ " attname, "
+ " format_type(atttypid, atttypmod), "
+ " attnotnull, "
+ " pg_get_expr(adbin, adrelid), "
+ " NULL, NULL "
+ "FROM pg_class c "
+ " JOIN pg_namespace n ON "
+ " relnamespace = n.oid "
+ " LEFT JOIN pg_attribute a ON "
+ " attrelid = c.oid AND attnum > 0 "
+ " AND NOT attisdropped "
+ " LEFT JOIN pg_attrdef ad ON "
+ " adrelid = c.oid AND adnum = attnum ");
+
+ appendStringInfoString(&buf,
+ "WHERE c.relkind IN ('r', 'v', 'f', 'm') "
+ " AND n.nspname = ");
+ deparseStringLiteral(&buf, stmt->remote_schema);
+
+ /* Apply restrictions for LIMIT TO and EXCEPT */
+ if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
+ stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
+ {
+ bool first_item = true;
+
+ appendStringInfoString(&buf, " AND c.relname ");
+ if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
+ appendStringInfoString(&buf, "NOT ");
+ appendStringInfoString(&buf, "IN (");
+
+ /* Append list of table names within IN clause */
+ foreach(lc, stmt->table_list)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+
+ if (first_item)
+ first_item = false;
+ else
+ appendStringInfoString(&buf, ", ");
+ deparseStringLiteral(&buf, rv->relname);
+ }
+ appendStringInfoString(&buf, ")");
+ }
+
+ /* Append ORDER BY at the end of query to ensure output ordering */
+ appendStringInfo(&buf, " ORDER BY c.relname, a.attnum");
+
+ /* Fetch the data */
+ res = PQexec(conn, buf.data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, buf.data);
+
+ /* Process results */
+ numrows = PQntuples(res);
+ /* note: incrementation of i happens in inner loop's while() test */
+ for (i = 0; i < numrows;)
+ {
+ char *tablename = PQgetvalue(res, i, 0);
+ bool first_item = true;
+
+ resetStringInfo(&buf);
+ appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
+ quote_identifier(tablename));
+
+ /* Scan all rows for this table */
+ do
+ {
+ char *attname;
+ char *typename;
+ char *attnotnull;
+ char *attdefault;
+ char *collname;
+ char *collnamespace;
+
+ /* If table has no columns, we'll see nulls here */
+ if (PQgetisnull(res, i, 1))
+ continue;
+
+ attname = PQgetvalue(res, i, 1);
+ typename = PQgetvalue(res, i, 2);
+ attnotnull = PQgetvalue(res, i, 3);
+ attdefault = PQgetisnull(res, i, 4) ? (char *) NULL :
+ PQgetvalue(res, i, 4);
+ collname = PQgetisnull(res, i, 5) ? (char *) NULL :
+ PQgetvalue(res, i, 5);
+ collnamespace = PQgetisnull(res, i, 6) ? (char *) NULL :
+ PQgetvalue(res, i, 6);
+
+ if (first_item)
+ first_item = false;
+ else
+ appendStringInfoString(&buf, ",\n");
+
+ /* Print column name and type */
+ appendStringInfo(&buf, " %s %s",
+ quote_identifier(attname),
+ typename);
+
+ /*
+ * Add column_name option so that renaming the foreign table's
+ * column doesn't break the association to the underlying
+ * column.
+ */
+ appendStringInfoString(&buf, " OPTIONS (column_name ");
+ deparseStringLiteral(&buf, attname);
+ appendStringInfoString(&buf, ")");
+
+ /* Add COLLATE if needed */
+ if (import_collate && collname != NULL && collnamespace != NULL)
+ appendStringInfo(&buf, " COLLATE %s.%s",
+ quote_identifier(collnamespace),
+ quote_identifier(collname));
+
+ /* Add DEFAULT if needed */
+ if (import_default && attdefault != NULL)
+ appendStringInfo(&buf, " DEFAULT %s", attdefault);
+
+ /* Add NOT NULL if needed */
+ if (import_not_null && attnotnull[0] == 't')
+ appendStringInfoString(&buf, " NOT NULL");
+ }
+ while (++i < numrows &&
+ strcmp(PQgetvalue(res, i, 0), tablename) == 0);
+
+ /*
+ * Add server name and table-level options. We specify remote
+ * schema and table name as options (the latter to ensure that
+ * renaming the foreign table doesn't break the association).
+ */
+ appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
+ quote_identifier(server->servername));
+
+ appendStringInfoString(&buf, "schema_name ");
+ deparseStringLiteral(&buf, stmt->remote_schema);
+ appendStringInfoString(&buf, ", table_name ");
+ deparseStringLiteral(&buf, tablename);
+
+ appendStringInfoString(&buf, ");");
+
+ commands = lappend(commands, pstrdup(buf.data));
+ }
+
+ /* Clean up */
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ ReleaseConnection(conn);
+
+ return commands;
+}
+
+/*
* Create a tuple from the specified row of the PGresult.
*
* rel is the local representation of the foreign table, attinmeta is