diff options
Diffstat (limited to 'src/bin/pg_dump/pg_dump.c')
-rw-r--r-- | src/bin/pg_dump/pg_dump.c | 344 |
1 files changed, 154 insertions, 190 deletions
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ace4ffe34bb..84d12b118f4 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -12,7 +12,7 @@ * by PostgreSQL * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.571 2010/02/17 04:19:40 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.572 2010/02/18 01:29:10 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -190,9 +190,9 @@ static void selectSourceSchema(const char *schemaName); static char *getFormattedTypeName(Oid oid, OidOptions opts); static char *myFormatType(const char *typname, int32 typmod); static const char *fmtQualifiedId(const char *schema, const char *id); -static bool hasBlobs(Archive *AH); +static void getBlobs(Archive *AH); +static void dumpBlob(Archive *AH, BlobInfo *binfo); static int dumpBlobs(Archive *AH, void *arg); -static int dumpBlobComments(Archive *AH, void *arg); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); @@ -701,25 +701,8 @@ main(int argc, char **argv) getTableDataFKConstraints(); } - if (outputBlobs && hasBlobs(g_fout)) - { - /* Add placeholders to allow correct sorting of blobs */ - DumpableObject *blobobj; - DumpableObject *blobcobj; - - blobobj = (DumpableObject *) malloc(sizeof(DumpableObject)); - blobobj->objType = DO_BLOBS; - blobobj->catId = nilCatalogId; - AssignDumpId(blobobj); - blobobj->name = strdup("BLOBS"); - - blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject)); - blobcobj->objType = DO_BLOB_COMMENTS; - blobcobj->catId = nilCatalogId; - AssignDumpId(blobcobj); - blobcobj->name = strdup("BLOB COMMENTS"); - addObjectDependency(blobcobj, blobobj->dumpId); - } + if (outputBlobs) + getBlobs(g_fout); /* * Collect dependency data to assist in ordering the objects. @@ -1808,7 +1791,7 @@ dumpDatabase(Archive *AH) appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid\n" "FROM pg_catalog.pg_class\n" - "WHERE oid = %d;\n", + "WHERE oid = %u;\n", LargeObjectRelationId); lo_res = PQexec(g_conn, loFrozenQry->data); @@ -1825,7 +1808,7 @@ dumpDatabase(Archive *AH) appendPQExpBuffer(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid.\n"); appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n" "SET relfrozenxid = '%u'\n" - "WHERE oid = %d;\n", + "WHERE oid = %u;\n", atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)), LargeObjectRelationId); ArchiveEntry(AH, nilCatalogId, createDumpId(), @@ -1938,40 +1921,135 @@ dumpStdStrings(Archive *AH) /* - * hasBlobs: - * Test whether database contains any large objects + * getBlobs: + * Collect schema-level data about large objects */ -static bool -hasBlobs(Archive *AH) +static void +getBlobs(Archive *AH) { - bool result; - const char *blobQry; - PGresult *res; + PQExpBuffer blobQry = createPQExpBuffer(); + BlobInfo *binfo; + DumpableObject *bdata; + PGresult *res; + int ntups; + int i; + + /* Verbose message */ + if (g_verbose) + write_msg(NULL, "reading large objects\n"); /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); - /* Check for BLOB OIDs */ + /* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */ if (AH->remoteVersion >= 90000) - blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1"; + appendPQExpBuffer(blobQry, + "SELECT oid, (%s lomowner) AS rolname, lomacl" + " FROM pg_largeobject_metadata", + username_subquery); else if (AH->remoteVersion >= 70100) - blobQry = "SELECT loid FROM pg_largeobject LIMIT 1"; + appendPQExpBuffer(blobQry, + "SELECT DISTINCT loid, NULL::oid, NULL::oid" + " FROM pg_largeobject"); else - blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1"; + appendPQExpBuffer(blobQry, + "SELECT oid, NULL::oid, NULL::oid" + " FROM pg_class WHERE relkind = 'l'"); - res = PQexec(g_conn, blobQry); - check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK); + res = PQexec(g_conn, blobQry->data); + check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + if (ntups > 0) + { + /* + * Each large object has its own BLOB archive entry. + */ + binfo = (BlobInfo *) malloc(ntups * sizeof(BlobInfo)); - result = PQntuples(res) > 0; + for (i = 0; i < ntups; i++) + { + binfo[i].dobj.objType = DO_BLOB; + binfo[i].dobj.catId.tableoid = LargeObjectRelationId; + binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, 0)); + AssignDumpId(&binfo[i].dobj); + + binfo[i].dobj.name = strdup(PQgetvalue(res, i, 0)); + if (!PQgetisnull(res, i, 1)) + binfo[i].rolname = strdup(PQgetvalue(res, i, 1)); + else + binfo[i].rolname = ""; + if (!PQgetisnull(res, i, 2)) + binfo[i].blobacl = strdup(PQgetvalue(res, i, 2)); + else + binfo[i].blobacl = NULL; + } + + /* + * If we have any large objects, a "BLOBS" archive entry is needed. + * This is just a placeholder for sorting; it carries no data now. + */ + bdata = (DumpableObject *) malloc(sizeof(DumpableObject)); + bdata->objType = DO_BLOB_DATA; + bdata->catId = nilCatalogId; + AssignDumpId(bdata); + bdata->name = strdup("BLOBS"); + } PQclear(res); + destroyPQExpBuffer(blobQry); +} - return result; +/* + * dumpBlob + * + * dump the definition (metadata) of the given large object + */ +static void +dumpBlob(Archive *AH, BlobInfo *binfo) +{ + PQExpBuffer cquery = createPQExpBuffer(); + PQExpBuffer dquery = createPQExpBuffer(); + + appendPQExpBuffer(cquery, + "SELECT pg_catalog.lo_create('%s');\n", + binfo->dobj.name); + + appendPQExpBuffer(dquery, + "SELECT pg_catalog.lo_unlink('%s');\n", + binfo->dobj.name); + + ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId, + binfo->dobj.name, + NULL, NULL, + binfo->rolname, false, + "BLOB", SECTION_PRE_DATA, + cquery->data, dquery->data, NULL, + binfo->dobj.dependencies, binfo->dobj.nDeps, + NULL, NULL); + + /* set up tag for comment and/or ACL */ + resetPQExpBuffer(cquery); + appendPQExpBuffer(cquery, "LARGE OBJECT %s", binfo->dobj.name); + + /* Dump comment if any */ + dumpComment(AH, cquery->data, + NULL, binfo->rolname, + binfo->dobj.catId, 0, binfo->dobj.dumpId); + + /* Dump ACL if any */ + if (binfo->blobacl) + dumpACL(AH, binfo->dobj.catId, binfo->dobj.dumpId, "LARGE OBJECT", + binfo->dobj.name, NULL, cquery->data, + NULL, binfo->rolname, binfo->blobacl); + + destroyPQExpBuffer(cquery); + destroyPQExpBuffer(dquery); } /* * dumpBlobs: - * dump all blobs + * dump the data contents of all large objects */ static int dumpBlobs(Archive *AH, void *arg) @@ -1980,6 +2058,7 @@ dumpBlobs(Archive *AH, void *arg) const char *blobFetchQry; PGresult *res; char buf[LOBBUFSIZE]; + int ntups; int i; int cnt; @@ -1989,7 +2068,10 @@ dumpBlobs(Archive *AH, void *arg) /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); - /* Cursor to get all BLOB OIDs */ + /* + * Currently, we re-fetch all BLOB OIDs using a cursor. Consider + * scanning the already-in-memory dumpable objects instead... + */ if (AH->remoteVersion >= 90000) blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata"; else if (AH->remoteVersion >= 70100) @@ -2012,7 +2094,8 @@ dumpBlobs(Archive *AH, void *arg) check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK); /* Process the tuples, if any */ - for (i = 0; i < PQntuples(res); i++) + ntups = PQntuples(res); + for (i = 0; i < ntups; i++) { Oid blobOid; int loFd; @@ -2022,8 +2105,8 @@ dumpBlobs(Archive *AH, void *arg) loFd = lo_open(g_conn, blobOid, INV_READ); if (loFd == -1) { - write_msg(NULL, "dumpBlobs(): could not open large object: %s", - PQerrorMessage(g_conn)); + write_msg(NULL, "dumpBlobs(): could not open large object %u: %s", + blobOid, PQerrorMessage(g_conn)); exit_nicely(); } @@ -2035,8 +2118,8 @@ dumpBlobs(Archive *AH, void *arg) cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE); if (cnt < 0) { - write_msg(NULL, "dumpBlobs(): error reading large object: %s", - PQerrorMessage(g_conn)); + write_msg(NULL, "dumpBlobs(): error reading large object %u: %s", + blobOid, PQerrorMessage(g_conn)); exit_nicely(); } @@ -2047,141 +2130,13 @@ dumpBlobs(Archive *AH, void *arg) EndBlob(AH, blobOid); } - } while (PQntuples(res) > 0); + } while (ntups > 0); PQclear(res); return 1; } -/* - * dumpBlobComments - * dump all blob properties. - * It has "BLOB COMMENTS" tag due to the historical reason, but note - * that it is the routine to dump all the properties of blobs. - * - * Since we don't provide any way to be selective about dumping blobs, - * there's no need to be selective about their comments either. We put - * all the comments into one big TOC entry. - */ -static int -dumpBlobComments(Archive *AH, void *arg) -{ - const char *blobQry; - const char *blobFetchQry; - PQExpBuffer cmdQry = createPQExpBuffer(); - PGresult *res; - int i; - - if (g_verbose) - write_msg(NULL, "saving large object properties\n"); - - /* Make sure we are in proper schema */ - selectSourceSchema("pg_catalog"); - - /* Cursor to get all BLOB comments */ - if (AH->remoteVersion >= 90000) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " - "obj_description(oid, 'pg_largeobject'), " - "pg_get_userbyid(lomowner), lomacl " - "FROM pg_largeobject_metadata"; - else if (AH->remoteVersion >= 70300) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " - "obj_description(loid, 'pg_largeobject'), NULL, NULL " - "FROM (SELECT DISTINCT loid FROM " - "pg_description d JOIN pg_largeobject l ON (objoid = loid) " - "WHERE classoid = 'pg_largeobject'::regclass) ss"; - else if (AH->remoteVersion >= 70200) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " - "obj_description(loid, 'pg_largeobject'), NULL, NULL " - "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; - else if (AH->remoteVersion >= 70100) - blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " - "obj_description(loid), NULL, NULL " - "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; - else - blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " - " ( " - " SELECT description " - " FROM pg_description pd " - " WHERE pd.objoid=pc.oid " - " ), NULL, NULL " - "FROM pg_class pc WHERE relkind = 'l'"; - - res = PQexec(g_conn, blobQry); - check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK); - - /* Command to fetch from cursor */ - blobFetchQry = "FETCH 100 IN blobcmt"; - - do - { - PQclear(res); - - /* Do a fetch */ - res = PQexec(g_conn, blobFetchQry); - check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK); - - /* Process the tuples, if any */ - for (i = 0; i < PQntuples(res); i++) - { - Oid blobOid = atooid(PQgetvalue(res, i, 0)); - char *lo_comment = PQgetvalue(res, i, 1); - char *lo_owner = PQgetvalue(res, i, 2); - char *lo_acl = PQgetvalue(res, i, 3); - char lo_name[32]; - - resetPQExpBuffer(cmdQry); - - /* comment on the blob */ - if (!PQgetisnull(res, i, 1)) - { - appendPQExpBuffer(cmdQry, - "COMMENT ON LARGE OBJECT %u IS ", blobOid); - appendStringLiteralAH(cmdQry, lo_comment, AH); - appendPQExpBuffer(cmdQry, ";\n"); - } - - /* dump blob ownership, if necessary */ - if (!PQgetisnull(res, i, 2)) - { - appendPQExpBuffer(cmdQry, - "ALTER LARGE OBJECT %u OWNER TO %s;\n", - blobOid, lo_owner); - } - - /* dump blob privileges, if necessary */ - if (!PQgetisnull(res, i, 3) && - !dataOnly && !aclsSkip) - { - snprintf(lo_name, sizeof(lo_name), "%u", blobOid); - if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT", - lo_acl, lo_owner, "", - AH->remoteVersion, cmdQry)) - { - write_msg(NULL, "could not parse ACL (%s) for " - "large object %u", lo_acl, blobOid); - exit_nicely(); - } - } - - if (cmdQry->len > 0) - { - appendPQExpBuffer(cmdQry, "\n"); - archputs(cmdQry->data, AH); - } - } - } while (PQntuples(res) > 0); - - PQclear(res); - - archputs("\n", AH); - - destroyPQExpBuffer(cmdQry); - - return 1; -} - static void binary_upgrade_set_type_oids_by_type_oid(PQExpBuffer upgrade_buffer, Oid pg_type_oid) @@ -6140,9 +6095,17 @@ dumpComment(Archive *fout, const char *target, CommentItem *comments; int ncomments; - /* Comments are SCHEMA not data */ - if (dataOnly) - return; + /* Comments are schema not data ... except blob comments are data */ + if (strncmp(target, "LARGE OBJECT ", 13) != 0) + { + if (dataOnly) + return; + } + else + { + if (schemaOnly) + return; + } /* Search for comments associated with catalogId, using table */ ncomments = findComments(fout, catalogId.tableoid, catalogId.oid, @@ -6530,7 +6493,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_DEFAULT_ACL: dumpDefaultACL(fout, (DefaultACLInfo *) dobj); break; - case DO_BLOBS: + case DO_BLOB: + dumpBlob(fout, (BlobInfo *) dobj); + break; + case DO_BLOB_DATA: ArchiveEntry(fout, dobj->catId, dobj->dumpId, dobj->name, NULL, NULL, "", false, "BLOBS", SECTION_DATA, @@ -6538,14 +6504,6 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) dobj->dependencies, dobj->nDeps, dumpBlobs, NULL); break; - case DO_BLOB_COMMENTS: - ArchiveEntry(fout, dobj->catId, dobj->dumpId, - dobj->name, NULL, NULL, "", - false, "BLOB COMMENTS", SECTION_DATA, - "", "", NULL, - dobj->dependencies, dobj->nDeps, - dumpBlobComments, NULL); - break; } } @@ -10382,14 +10340,16 @@ dumpDefaultACL(Archive *fout, DefaultACLInfo *daclinfo) * * 'objCatId' is the catalog ID of the underlying object. * 'objDumpId' is the dump ID of the underlying object. - * 'type' must be TABLE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE. + * 'type' must be one of + * TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE, + * FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT. * 'name' is the formatted name of the object. Must be quoted etc. already. * 'subname' is the formatted name of the sub-object, if any. Must be quoted. * 'tag' is the tag for the archive entry (typ. unquoted name of object). * 'nspname' is the namespace the object is in (NULL if none). * 'owner' is the owner, NULL if there is no owner (for languages). * 'acls' is the string read out of the fooacl system catalog field; - * it will be parsed here. + * it will be parsed here. *---------- */ static void @@ -10401,7 +10361,11 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId, PQExpBuffer sql; /* Do nothing if ACL dump is not enabled */ - if (dataOnly || aclsSkip) + if (aclsSkip) + return; + + /* --data-only skips ACLs *except* BLOB ACLs */ + if (dataOnly && strcmp(type, "LARGE OBJECT") != 0) return; sql = createPQExpBuffer(); |