aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_dump.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/pg_dump.c')
-rw-r--r--src/bin/pg_dump/pg_dump.c344
1 files changed, 154 insertions, 190 deletions
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index ace4ffe34bb..84d12b118f4 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -12,7 +12,7 @@
* by PostgreSQL
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.571 2010/02/17 04:19:40 tgl Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.572 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -190,9 +190,9 @@ static void selectSourceSchema(const char *schemaName);
static char *getFormattedTypeName(Oid oid, OidOptions opts);
static char *myFormatType(const char *typname, int32 typmod);
static const char *fmtQualifiedId(const char *schema, const char *id);
-static bool hasBlobs(Archive *AH);
+static void getBlobs(Archive *AH);
+static void dumpBlob(Archive *AH, BlobInfo *binfo);
static int dumpBlobs(Archive *AH, void *arg);
-static int dumpBlobComments(Archive *AH, void *arg);
static void dumpDatabase(Archive *AH);
static void dumpEncoding(Archive *AH);
static void dumpStdStrings(Archive *AH);
@@ -701,25 +701,8 @@ main(int argc, char **argv)
getTableDataFKConstraints();
}
- if (outputBlobs && hasBlobs(g_fout))
- {
- /* Add placeholders to allow correct sorting of blobs */
- DumpableObject *blobobj;
- DumpableObject *blobcobj;
-
- blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
- blobobj->objType = DO_BLOBS;
- blobobj->catId = nilCatalogId;
- AssignDumpId(blobobj);
- blobobj->name = strdup("BLOBS");
-
- blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
- blobcobj->objType = DO_BLOB_COMMENTS;
- blobcobj->catId = nilCatalogId;
- AssignDumpId(blobcobj);
- blobcobj->name = strdup("BLOB COMMENTS");
- addObjectDependency(blobcobj, blobobj->dumpId);
- }
+ if (outputBlobs)
+ getBlobs(g_fout);
/*
* Collect dependency data to assist in ordering the objects.
@@ -1808,7 +1791,7 @@ dumpDatabase(Archive *AH)
appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid\n"
"FROM pg_catalog.pg_class\n"
- "WHERE oid = %d;\n",
+ "WHERE oid = %u;\n",
LargeObjectRelationId);
lo_res = PQexec(g_conn, loFrozenQry->data);
@@ -1825,7 +1808,7 @@ dumpDatabase(Archive *AH)
appendPQExpBuffer(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid.\n");
appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n"
"SET relfrozenxid = '%u'\n"
- "WHERE oid = %d;\n",
+ "WHERE oid = %u;\n",
atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
LargeObjectRelationId);
ArchiveEntry(AH, nilCatalogId, createDumpId(),
@@ -1938,40 +1921,135 @@ dumpStdStrings(Archive *AH)
/*
- * hasBlobs:
- * Test whether database contains any large objects
+ * getBlobs:
+ * Collect schema-level data about large objects
*/
-static bool
-hasBlobs(Archive *AH)
+static void
+getBlobs(Archive *AH)
{
- bool result;
- const char *blobQry;
- PGresult *res;
+ PQExpBuffer blobQry = createPQExpBuffer();
+ BlobInfo *binfo;
+ DumpableObject *bdata;
+ PGresult *res;
+ int ntups;
+ int i;
+
+ /* Verbose message */
+ if (g_verbose)
+ write_msg(NULL, "reading large objects\n");
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
- /* Check for BLOB OIDs */
+ /* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */
if (AH->remoteVersion >= 90000)
- blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
+ appendPQExpBuffer(blobQry,
+ "SELECT oid, (%s lomowner) AS rolname, lomacl"
+ " FROM pg_largeobject_metadata",
+ username_subquery);
else if (AH->remoteVersion >= 70100)
- blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
+ appendPQExpBuffer(blobQry,
+ "SELECT DISTINCT loid, NULL::oid, NULL::oid"
+ " FROM pg_largeobject");
else
- blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
+ appendPQExpBuffer(blobQry,
+ "SELECT oid, NULL::oid, NULL::oid"
+ " FROM pg_class WHERE relkind = 'l'");
- res = PQexec(g_conn, blobQry);
- check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
+ res = PQexec(g_conn, blobQry->data);
+ check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+ if (ntups > 0)
+ {
+ /*
+ * Each large object has its own BLOB archive entry.
+ */
+ binfo = (BlobInfo *) malloc(ntups * sizeof(BlobInfo));
- result = PQntuples(res) > 0;
+ for (i = 0; i < ntups; i++)
+ {
+ binfo[i].dobj.objType = DO_BLOB;
+ binfo[i].dobj.catId.tableoid = LargeObjectRelationId;
+ binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, 0));
+ AssignDumpId(&binfo[i].dobj);
+
+ binfo[i].dobj.name = strdup(PQgetvalue(res, i, 0));
+ if (!PQgetisnull(res, i, 1))
+ binfo[i].rolname = strdup(PQgetvalue(res, i, 1));
+ else
+ binfo[i].rolname = "";
+ if (!PQgetisnull(res, i, 2))
+ binfo[i].blobacl = strdup(PQgetvalue(res, i, 2));
+ else
+ binfo[i].blobacl = NULL;
+ }
+
+ /*
+ * If we have any large objects, a "BLOBS" archive entry is needed.
+ * This is just a placeholder for sorting; it carries no data now.
+ */
+ bdata = (DumpableObject *) malloc(sizeof(DumpableObject));
+ bdata->objType = DO_BLOB_DATA;
+ bdata->catId = nilCatalogId;
+ AssignDumpId(bdata);
+ bdata->name = strdup("BLOBS");
+ }
PQclear(res);
+ destroyPQExpBuffer(blobQry);
+}
- return result;
+/*
+ * dumpBlob
+ *
+ * dump the definition (metadata) of the given large object
+ */
+static void
+dumpBlob(Archive *AH, BlobInfo *binfo)
+{
+ PQExpBuffer cquery = createPQExpBuffer();
+ PQExpBuffer dquery = createPQExpBuffer();
+
+ appendPQExpBuffer(cquery,
+ "SELECT pg_catalog.lo_create('%s');\n",
+ binfo->dobj.name);
+
+ appendPQExpBuffer(dquery,
+ "SELECT pg_catalog.lo_unlink('%s');\n",
+ binfo->dobj.name);
+
+ ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
+ binfo->dobj.name,
+ NULL, NULL,
+ binfo->rolname, false,
+ "BLOB", SECTION_PRE_DATA,
+ cquery->data, dquery->data, NULL,
+ binfo->dobj.dependencies, binfo->dobj.nDeps,
+ NULL, NULL);
+
+ /* set up tag for comment and/or ACL */
+ resetPQExpBuffer(cquery);
+ appendPQExpBuffer(cquery, "LARGE OBJECT %s", binfo->dobj.name);
+
+ /* Dump comment if any */
+ dumpComment(AH, cquery->data,
+ NULL, binfo->rolname,
+ binfo->dobj.catId, 0, binfo->dobj.dumpId);
+
+ /* Dump ACL if any */
+ if (binfo->blobacl)
+ dumpACL(AH, binfo->dobj.catId, binfo->dobj.dumpId, "LARGE OBJECT",
+ binfo->dobj.name, NULL, cquery->data,
+ NULL, binfo->rolname, binfo->blobacl);
+
+ destroyPQExpBuffer(cquery);
+ destroyPQExpBuffer(dquery);
}
/*
* dumpBlobs:
- * dump all blobs
+ * dump the data contents of all large objects
*/
static int
dumpBlobs(Archive *AH, void *arg)
@@ -1980,6 +2058,7 @@ dumpBlobs(Archive *AH, void *arg)
const char *blobFetchQry;
PGresult *res;
char buf[LOBBUFSIZE];
+ int ntups;
int i;
int cnt;
@@ -1989,7 +2068,10 @@ dumpBlobs(Archive *AH, void *arg)
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
- /* Cursor to get all BLOB OIDs */
+ /*
+ * Currently, we re-fetch all BLOB OIDs using a cursor. Consider
+ * scanning the already-in-memory dumpable objects instead...
+ */
if (AH->remoteVersion >= 90000)
blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
else if (AH->remoteVersion >= 70100)
@@ -2012,7 +2094,8 @@ dumpBlobs(Archive *AH, void *arg)
check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
/* Process the tuples, if any */
- for (i = 0; i < PQntuples(res); i++)
+ ntups = PQntuples(res);
+ for (i = 0; i < ntups; i++)
{
Oid blobOid;
int loFd;
@@ -2022,8 +2105,8 @@ dumpBlobs(Archive *AH, void *arg)
loFd = lo_open(g_conn, blobOid, INV_READ);
if (loFd == -1)
{
- write_msg(NULL, "dumpBlobs(): could not open large object: %s",
- PQerrorMessage(g_conn));
+ write_msg(NULL, "dumpBlobs(): could not open large object %u: %s",
+ blobOid, PQerrorMessage(g_conn));
exit_nicely();
}
@@ -2035,8 +2118,8 @@ dumpBlobs(Archive *AH, void *arg)
cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE);
if (cnt < 0)
{
- write_msg(NULL, "dumpBlobs(): error reading large object: %s",
- PQerrorMessage(g_conn));
+ write_msg(NULL, "dumpBlobs(): error reading large object %u: %s",
+ blobOid, PQerrorMessage(g_conn));
exit_nicely();
}
@@ -2047,141 +2130,13 @@ dumpBlobs(Archive *AH, void *arg)
EndBlob(AH, blobOid);
}
- } while (PQntuples(res) > 0);
+ } while (ntups > 0);
PQclear(res);
return 1;
}
-/*
- * dumpBlobComments
- * dump all blob properties.
- * It has "BLOB COMMENTS" tag due to the historical reason, but note
- * that it is the routine to dump all the properties of blobs.
- *
- * Since we don't provide any way to be selective about dumping blobs,
- * there's no need to be selective about their comments either. We put
- * all the comments into one big TOC entry.
- */
-static int
-dumpBlobComments(Archive *AH, void *arg)
-{
- const char *blobQry;
- const char *blobFetchQry;
- PQExpBuffer cmdQry = createPQExpBuffer();
- PGresult *res;
- int i;
-
- if (g_verbose)
- write_msg(NULL, "saving large object properties\n");
-
- /* Make sure we are in proper schema */
- selectSourceSchema("pg_catalog");
-
- /* Cursor to get all BLOB comments */
- if (AH->remoteVersion >= 90000)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
- "obj_description(oid, 'pg_largeobject'), "
- "pg_get_userbyid(lomowner), lomacl "
- "FROM pg_largeobject_metadata";
- else if (AH->remoteVersion >= 70300)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
- "obj_description(loid, 'pg_largeobject'), NULL, NULL "
- "FROM (SELECT DISTINCT loid FROM "
- "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
- "WHERE classoid = 'pg_largeobject'::regclass) ss";
- else if (AH->remoteVersion >= 70200)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
- "obj_description(loid, 'pg_largeobject'), NULL, NULL "
- "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
- else if (AH->remoteVersion >= 70100)
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
- "obj_description(loid), NULL, NULL "
- "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
- else
- blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
- " ( "
- " SELECT description "
- " FROM pg_description pd "
- " WHERE pd.objoid=pc.oid "
- " ), NULL, NULL "
- "FROM pg_class pc WHERE relkind = 'l'";
-
- res = PQexec(g_conn, blobQry);
- check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
-
- /* Command to fetch from cursor */
- blobFetchQry = "FETCH 100 IN blobcmt";
-
- do
- {
- PQclear(res);
-
- /* Do a fetch */
- res = PQexec(g_conn, blobFetchQry);
- check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
-
- /* Process the tuples, if any */
- for (i = 0; i < PQntuples(res); i++)
- {
- Oid blobOid = atooid(PQgetvalue(res, i, 0));
- char *lo_comment = PQgetvalue(res, i, 1);
- char *lo_owner = PQgetvalue(res, i, 2);
- char *lo_acl = PQgetvalue(res, i, 3);
- char lo_name[32];
-
- resetPQExpBuffer(cmdQry);
-
- /* comment on the blob */
- if (!PQgetisnull(res, i, 1))
- {
- appendPQExpBuffer(cmdQry,
- "COMMENT ON LARGE OBJECT %u IS ", blobOid);
- appendStringLiteralAH(cmdQry, lo_comment, AH);
- appendPQExpBuffer(cmdQry, ";\n");
- }
-
- /* dump blob ownership, if necessary */
- if (!PQgetisnull(res, i, 2))
- {
- appendPQExpBuffer(cmdQry,
- "ALTER LARGE OBJECT %u OWNER TO %s;\n",
- blobOid, lo_owner);
- }
-
- /* dump blob privileges, if necessary */
- if (!PQgetisnull(res, i, 3) &&
- !dataOnly && !aclsSkip)
- {
- snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
- if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
- lo_acl, lo_owner, "",
- AH->remoteVersion, cmdQry))
- {
- write_msg(NULL, "could not parse ACL (%s) for "
- "large object %u", lo_acl, blobOid);
- exit_nicely();
- }
- }
-
- if (cmdQry->len > 0)
- {
- appendPQExpBuffer(cmdQry, "\n");
- archputs(cmdQry->data, AH);
- }
- }
- } while (PQntuples(res) > 0);
-
- PQclear(res);
-
- archputs("\n", AH);
-
- destroyPQExpBuffer(cmdQry);
-
- return 1;
-}
-
static void
binary_upgrade_set_type_oids_by_type_oid(PQExpBuffer upgrade_buffer,
Oid pg_type_oid)
@@ -6140,9 +6095,17 @@ dumpComment(Archive *fout, const char *target,
CommentItem *comments;
int ncomments;
- /* Comments are SCHEMA not data */
- if (dataOnly)
- return;
+ /* Comments are schema not data ... except blob comments are data */
+ if (strncmp(target, "LARGE OBJECT ", 13) != 0)
+ {
+ if (dataOnly)
+ return;
+ }
+ else
+ {
+ if (schemaOnly)
+ return;
+ }
/* Search for comments associated with catalogId, using table */
ncomments = findComments(fout, catalogId.tableoid, catalogId.oid,
@@ -6530,7 +6493,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
case DO_DEFAULT_ACL:
dumpDefaultACL(fout, (DefaultACLInfo *) dobj);
break;
- case DO_BLOBS:
+ case DO_BLOB:
+ dumpBlob(fout, (BlobInfo *) dobj);
+ break;
+ case DO_BLOB_DATA:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
dobj->name, NULL, NULL, "",
false, "BLOBS", SECTION_DATA,
@@ -6538,14 +6504,6 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
dobj->dependencies, dobj->nDeps,
dumpBlobs, NULL);
break;
- case DO_BLOB_COMMENTS:
- ArchiveEntry(fout, dobj->catId, dobj->dumpId,
- dobj->name, NULL, NULL, "",
- false, "BLOB COMMENTS", SECTION_DATA,
- "", "", NULL,
- dobj->dependencies, dobj->nDeps,
- dumpBlobComments, NULL);
- break;
}
}
@@ -10382,14 +10340,16 @@ dumpDefaultACL(Archive *fout, DefaultACLInfo *daclinfo)
*
* 'objCatId' is the catalog ID of the underlying object.
* 'objDumpId' is the dump ID of the underlying object.
- * 'type' must be TABLE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE.
+ * 'type' must be one of
+ * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
+ * FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT.
* 'name' is the formatted name of the object. Must be quoted etc. already.
* 'subname' is the formatted name of the sub-object, if any. Must be quoted.
* 'tag' is the tag for the archive entry (typ. unquoted name of object).
* 'nspname' is the namespace the object is in (NULL if none).
* 'owner' is the owner, NULL if there is no owner (for languages).
* 'acls' is the string read out of the fooacl system catalog field;
- * it will be parsed here.
+ * it will be parsed here.
*----------
*/
static void
@@ -10401,7 +10361,11 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId,
PQExpBuffer sql;
/* Do nothing if ACL dump is not enabled */
- if (dataOnly || aclsSkip)
+ if (aclsSkip)
+ return;
+
+ /* --data-only skips ACLs *except* BLOB ACLs */
+ if (dataOnly && strcmp(type, "LARGE OBJECT") != 0)
return;
sql = createPQExpBuffer();