diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2024-04-01 16:46:24 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2024-04-01 16:46:24 -0400 |
commit | 959b38d770ba1f8f35edab27ef3ccf8b1d99f5dd (patch) | |
tree | e1396f20b9672178d2ff3d452851078bc7b12202 /src/bin/pg_dump/pg_backup_archiver.c | |
parent | a45c78e3284b269085e9a0cbd0ea3b236b7180fa (diff) | |
download | postgresql-959b38d770ba1f8f35edab27ef3ccf8b1d99f5dd.tar.gz postgresql-959b38d770ba1f8f35edab27ef3ccf8b1d99f5dd.zip |
Invent --transaction-size option for pg_restore.
This patch allows pg_restore to wrap its commands into transaction
blocks, somewhat like --single-transaction, except that we commit
and start a new block after every N objects. Using this mode
with a size limit of 1000 or so objects greatly reduces the number
of transactions consumed by the restore, while preventing any
one transaction from taking enough locks to overrun the receiving
server's shared lock table.
(A value of 1000 works well with the default lock table size of
around 6400 locks. Higher --transaction-size values can be used
if one has increased the receiving server's lock table size.)
Excessive consumption of XIDs has been reported as a problem for
pg_upgrade in particular, but it could be bad for any restore; and the
change also reduces the number of fsyncs and amount of WAL generated,
so it should provide speed benefits too.
This patch does not try to make parallel workers batch the SQL
commands they issue. The trouble with doing that is that other
workers may need to see the objects a worker creates right away.
Possibly this can be improved later.
In this patch I have hard-wired pg_upgrade to use a transaction size
of 1000 divided by the number of parallel restore jobs allowed
(without that, we'd still be at risk of overrunning the shared lock
table). Perhaps there would be value in adding another pg_upgrade
option to allow user control of that, but I'm unsure that it's worth
the trouble; I think few users would use it, and any who did would see
not that much benefit compared to the default.
Patch by me, but the original idea to batch SQL commands during
restore is due to Robins Tharakan.
Discussion: https://postgr.es/m/a9f9376f1c3343a6bb319dce294e20ac@EX13D05UWC001.ant.amazon.com
Diffstat (limited to 'src/bin/pg_dump/pg_backup_archiver.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 139 |
1 files changed, 133 insertions, 6 deletions
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index d6e15e25a19..c7a6c918a65 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX) /* Otherwise, drop anything that's selected and has a dropStmt */ if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt) { + bool not_allowed_in_txn = false; + pg_log_info("dropping %s %s", te->desc, te->tag); + + /* + * In --transaction-size mode, we have to temporarily exit our + * transaction block to drop objects that can't be dropped + * within a transaction. + */ + if (ropt->txn_size > 0) + { + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + not_allowed_in_txn = true; + if (AH->connection) + CommitTransaction(AHX); + else + ahprintf(AH, "COMMIT;\n"); + } + } + /* Select owner and schema as necessary */ _becomeOwner(AH, te); _selectOutputSchema(AH, te->namespace); @@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX) } } } + + /* + * In --transaction-size mode, re-establish the transaction + * block if needed; otherwise, commit after every N drops. + */ + if (ropt->txn_size > 0) + { + if (not_allowed_in_txn) + { + if (AH->connection) + StartTransaction(AHX); + else + ahprintf(AH, "BEGIN;\n"); + AH->txnCount = 0; + } + else if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(AHX); + StartTransaction(AHX); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n"); + AH->txnCount = 0; + } + } } } @@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX) } } - if (ropt->single_txn) + /* + * Close out any persistent transaction we may have. While these two + * cases are started in different places, we can end both cases here. + */ + if (ropt->single_txn || ropt->txn_size > 0) { if (AH->connection) CommitTransaction(AHX); @@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) */ if ((reqs & REQ_SCHEMA) != 0) { + bool object_is_db = false; + + /* + * In --transaction-size mode, must exit our transaction block to + * create a database or set its properties. + */ + if (strcmp(te->desc, "DATABASE") == 0 || + strcmp(te->desc, "DATABASE PROPERTIES") == 0) + { + object_is_db = true; + if (ropt->txn_size > 0) + { + if (AH->connection) + CommitTransaction(&AH->public); + else + ahprintf(AH, "COMMIT;\n\n"); + } + } + /* Show namespace in log message if available */ if (te->namespace) pg_log_info("creating %s \"%s.%s\"", @@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) /* * If we created a DB, connect to it. Also, if we changed DB * properties, reconnect to ensure that relevant GUC settings are - * applied to our session. + * applied to our session. (That also restarts the transaction block + * in --transaction-size mode.) */ - if (strcmp(te->desc, "DATABASE") == 0 || - strcmp(te->desc, "DATABASE PROPERTIES") == 0) + if (object_is_db) { pg_log_info("connecting to new database \"%s\"", te->tag); _reconnectToDB(AH, te->tag); @@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) } } + /* + * If we emitted anything for this TOC entry, that counts as one action + * against the transaction-size limit. Commit if it's time to. + */ + if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0) + { + if (++AH->txnCount >= ropt->txn_size) + { + if (AH->connection) + { + CommitTransaction(&AH->public); + StartTransaction(&AH->public); + } + else + ahprintf(AH, "COMMIT;\nBEGIN;\n\n"); + AH->txnCount = 0; + } + } + if (AH->public.n_errors > 0 && status == WORKER_OK) status = WORKER_IGNORED_ERRORS; @@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + /* + * LOs must be restored within a transaction block, since we need the LO + * handle to stay open while we write it. Establish a transaction unless + * there's one being used globally. + */ + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) StartTransaction(&AH->public); @@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH) { RestoreOptions *ropt = AH->public.ropt; - if (!ropt->single_txn) + if (!(ropt->single_txn || ropt->txn_size > 0)) { if (AH->connection) CommitTransaction(&AH->public); @@ -3171,6 +3266,19 @@ _doSetFixedOutputState(ArchiveHandle *AH) else ahprintf(AH, "SET row_security = off;\n"); + /* + * In --transaction-size mode, we should always be in a transaction when + * we begin to restore objects. + */ + if (ropt && ropt->txn_size > 0) + { + if (AH->connection) + StartTransaction(&AH->public); + else + ahprintf(AH, "\nBEGIN;\n"); + AH->txnCount = 0; + } + ahprintf(AH, "\n"); } @@ -4044,6 +4152,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) } /* + * In --transaction-size mode, we must commit the open transaction before + * dropping the database connection. This also ensures that child workers + * can see the objects we've created so far. + */ + if (AH->public.ropt->txn_size > 0) + CommitTransaction(&AH->public); + + /* * Now close parent connection in prep for parallel steps. We do this * mainly to ensure that we don't exceed the specified number of parallel * connections. @@ -4782,6 +4898,10 @@ CloneArchive(ArchiveHandle *AH) clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle)); memcpy(clone, AH, sizeof(ArchiveHandle)); + /* Likewise flat-copy the RestoreOptions, so we can alter them locally */ + clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions)); + memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions)); + /* Handle format-independent fields */ memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse)); @@ -4804,6 +4924,13 @@ CloneArchive(ArchiveHandle *AH) clone->lo_buf = NULL; /* + * Clone connections disregard --transaction-size; they must commit after + * each command so that the results are immediately visible to other + * workers. + */ + clone->public.ropt->txn_size = 0; + + /* * Connect our new clone object to the database, using the same connection * parameters used for the original connection. */ |