diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 6 | ||||
-rw-r--r-- | src/backend/replication/logical/Makefile | 1 | ||||
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 1630 | ||||
-rw-r--r-- | src/backend/replication/logical/decode.c | 5 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 220 | ||||
-rw-r--r-- | src/backend/replication/logical/meson.build | 1 | ||||
-rw-r--r-- | src/backend/replication/logical/origin.c | 26 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 37 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 5 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 25 | ||||
-rwxr-xr-x[-rw-r--r--] | src/backend/replication/logical/worker.c | 1354 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 22 |
12 files changed, 3033 insertions, 299 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 219cd73b7fc..c40c6220db8 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -443,9 +443,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, "proto_version '%u'", options->proto.logical.proto_version); - if (options->proto.logical.streaming && - PQserverVersion(conn->streamConn) >= 140000) - appendStringInfoString(&cmd, ", streaming 'on'"); + if (options->proto.logical.streaming_str) + appendStringInfo(&cmd, ", streaming '%s'", + options->proto.logical.streaming_str); if (options->proto.logical.twophase && PQserverVersion(conn->streamConn) >= 150000) diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c4e2fdeb719..2dc25e37bb8 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = \ + applyparallelworker.o \ decode.o \ launcher.o \ logical.o \ diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c new file mode 100644 index 00000000000..2e5914d5d95 --- /dev/null +++ b/src/backend/replication/logical/applyparallelworker.c @@ -0,0 +1,1630 @@ +/*------------------------------------------------------------------------- + * applyparallelworker.c + * Support routines for applying xact by parallel apply worker + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/applyparallelworker.c + * + * This file contains the code to launch, set up, and teardown a parallel apply + * worker which receives the changes from the leader worker and invokes routines + * to apply those on the subscriber database. Additionally, this file contains + * routines that are intended to support setting up, using, and tearing down a + * ParallelApplyWorkerInfo which is required so the leader worker and parallel + * apply workers can communicate with each other. + * + * The parallel apply workers are assigned (if available) as soon as xact's + * first stream is received for subscriptions that have set their 'streaming' + * option as parallel. The leader apply worker will send changes to this new + * worker via shared memory. We keep this worker assigned till the transaction + * commit is received and also wait for the worker to finish at commit. This + * preserves commit ordering and avoid file I/O in most cases, although we + * still need to spill to a file if there is no worker available. See comments + * atop logical/worker to know more about streamed xacts whose changes are + * spilled to disk. It is important to maintain commit order to avoid failures + * due to: (a) transaction dependencies - say if we insert a row in the first + * transaction and update it in the second transaction on publisher then + * allowing the subscriber to apply both in parallel can lead to failure in the + * update; (b) deadlocks - allowing transactions that update the same set of + * rows/tables in the opposite order to be applied in parallel can lead to + * deadlocks. + * + * A worker pool is used to avoid restarting workers for each streaming + * transaction. We maintain each worker's information (ParallelApplyWorkerInfo) + * in the ParallelApplyWorkerPool. After successfully launching a new worker, + * its information is added to the ParallelApplyWorkerPool. Once the worker + * finishes applying the transaction, it is marked as available for re-use. + * Now, before starting a new worker to apply the streaming transaction, we + * check the list for any available worker. Note that we retain a maximum of + * half the max_parallel_apply_workers_per_subscription workers in the pool and + * after that, we simply exit the worker after applying the transaction. + * + * XXX This worker pool threshold is arbitrary and we can provide a GUC + * variable for this in the future if required. + * + * The leader apply worker will create a separate dynamic shared memory segment + * when each parallel apply worker starts. The reason for this design is that + * we cannot predict how many workers will be needed. It may be possible to + * allocate enough shared memory in one segment based on the maximum number of + * parallel apply workers (max_parallel_apply_workers_per_subscription), but + * this would waste memory if no process is actually started. + * + * The dynamic shared memory segment contains: (a) a shm_mq that is used to + * send changes in the transaction from leader apply worker to parallel apply + * worker; (b) another shm_mq that is used to send errors (and other messages + * reported via elog/ereport) from the parallel apply worker to leader apply + * worker; (c) necessary information to be shared among parallel apply workers + * and the leader apply worker (i.e. members of ParallelApplyWorkerShared). + * + * Locking Considerations + * ---------------------- + * We have a risk of deadlock due to concurrently applying the transactions in + * parallel mode that were independent on the publisher side but became + * dependent on the subscriber side due to the different database structures + * (like schema of subscription tables, constraints, etc.) on each side. This + * can happen even without parallel mode when there are concurrent operations + * on the subscriber. In order to detect the deadlocks among leader (LA) and + * parallel apply (PA) workers, we used lmgr locks when the PA waits for the + * next stream (set of changes) and LA waits for PA to finish the transaction. + * An alternative approach could be to not allow parallelism when the schema of + * tables is different between the publisher and subscriber but that would be + * too restrictive and would require the publisher to send much more + * information than it is currently sending. + * + * Consider a case where the subscribed table does not have a unique key on the + * publisher and has a unique key on the subscriber. The deadlock can happen in + * the following ways: + * + * 1) Deadlock between the leader apply worker and a parallel apply worker + * + * Consider that the parallel apply worker (PA) is executing TX-1 and the + * leader apply worker (LA) is executing TX-2 concurrently on the subscriber. + * Now, LA is waiting for PA because of the unique key constraint of the + * subscribed table while PA is waiting for LA to send the next stream of + * changes or transaction finish command message. + * + * In order for lmgr to detect this, we have LA acquire a session lock on the + * remote transaction (by pa_lock_stream()) and have PA wait on the lock before + * trying to receive the next stream of changes. Specifically, LA will acquire + * the lock in AccessExclusive mode before sending the STREAM_STOP and will + * release it if already acquired after sending the STREAM_START, STREAM_ABORT + * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will + * acquire the lock in AccessShare mode after processing STREAM_STOP and + * STREAM_ABORT (for subtransaction) and then release the lock immediately + * after acquiring it. + * + * The lock graph for the above example will look as follows: + * LA (waiting to acquire the lock on the unique index) -> PA (waiting to + * acquire the stream lock) -> LA + * + * This way, when PA is waiting for LA for the next stream of changes, we can + * have a wait-edge from PA to LA in lmgr, which will make us detect the + * deadlock between LA and PA. + * + * 2) Deadlock between the leader apply worker and parallel apply workers + * + * This scenario is similar to the first case but TX-1 and TX-2 are executed by + * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario, + * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting + * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its + * transaction in order to preserve the commit order. There is a deadlock among + * the three processes. + * + * In order for lmgr to detect this, we have PA acquire a session lock (this is + * a different lock than referred in the previous case, see + * pa_lock_transaction()) on the transaction being applied and have LA wait on + * the lock before proceeding in the transaction finish commands. Specifically, + * PA will acquire this lock in AccessExclusive mode before executing the first + * message of the transaction and release it at the xact end. LA will acquire + * this lock in AccessShare mode at transaction finish commands and release it + * immediately. + * + * The lock graph for the above example will look as follows: + * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the + * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream + * lock) -> LA + * + * This way when LA is waiting to finish the transaction end command to preserve + * the commit order, we will be able to detect deadlock, if any. + * + * One might think we can use XactLockTableWait(), but XactLockTableWait() + * considers PREPARED TRANSACTION as still in progress which means the lock + * won't be released even after the parallel apply worker has prepared the + * transaction. + * + * 3) Deadlock when the shm_mq buffer is full + * + * In the previous scenario (ie. PA-1 and PA-2 are executing transactions + * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to + * wait to send messages, and this wait doesn't appear in lmgr. + * + * To avoid this wait, we use a non-blocking write and wait with a timeout. If + * the timeout is exceeded, the LA will serialize all the pending messages to + * a file and indicate PA-2 that it needs to read that file for the remaining + * messages. Then LA will start waiting for commit as in the previous case + * which will detect deadlock if any. See pa_send_data() and + * enum TransApplyAction. + * + * Lock types + * ---------- + * Both the stream lock and the transaction lock mentioned above are + * session-level locks because both locks could be acquired outside the + * transaction, and the stream lock in the leader needs to persist across + * transaction boundaries i.e. until the end of the streaming transaction. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "pgstat.h" +#include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/origin.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "tcop/tcopprot.h" +#include "utils/inval.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067 + +/* + * DSM keys for parallel apply worker. Unlike other parallel execution code, + * since we don't need to worry about DSM keys conflicting with plan_node_id we + * can use small integers. + */ +#define PARALLEL_APPLY_KEY_SHARED 1 +#define PARALLEL_APPLY_KEY_MQ 2 +#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3 + +/* Queue size of DSM, 16 MB for now. */ +#define DSM_QUEUE_SIZE (16 * 1024 * 1024) + +/* + * Error queue size of DSM. It is desirable to make it large enough that a + * typical ErrorResponse can be sent without blocking. That way, a worker that + * errors out can write the whole message into the queue and terminate without + * waiting for the user backend. + */ +#define DSM_ERROR_QUEUE_SIZE (16 * 1024) + +/* + * There are three fields in each message received by the parallel apply + * worker: start_lsn, end_lsn and send_time. Because we have updated these + * statistics in the leader apply worker, we can ignore these fields in the + * parallel apply worker (see function LogicalRepApplyLoop). + */ +#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz)) + +/* + * The type of session-level lock on a transaction being applied on a logical + * replication subscriber. + */ +#define PARALLEL_APPLY_LOCK_STREAM 0 +#define PARALLEL_APPLY_LOCK_XACT 1 + +/* + * Hash table entry to map xid to the parallel apply worker state. + */ +typedef struct ParallelApplyWorkerEntry +{ + TransactionId xid; /* Hash key -- must be first */ + ParallelApplyWorkerInfo *winfo; +} ParallelApplyWorkerEntry; + +/* + * A hash table used to cache the state of streaming transactions being applied + * by the parallel apply workers. + */ +static HTAB *ParallelApplyTxnHash = NULL; + +/* +* A list (pool) of active parallel apply workers. The information for +* the new worker is added to the list after successfully launching it. The +* list entry is removed if there are already enough workers in the worker +* pool at the end of the transaction. For more information about the worker +* pool, see comments atop this file. + */ +static List *ParallelApplyWorkerPool = NIL; + +/* + * Information shared between leader apply worker and parallel apply worker. + */ +ParallelApplyWorkerShared *MyParallelShared = NULL; + +/* + * Is there a message sent by a parallel apply worker that the leader apply + * worker needs to receive? + */ +volatile sig_atomic_t ParallelApplyMessagePending = false; + +/* + * Cache the parallel apply worker information required for applying the + * current streaming transaction. It is used to save the cost of searching the + * hash table when applying the changes between STREAM_START and STREAM_STOP. + */ +static ParallelApplyWorkerInfo *stream_apply_worker = NULL; + +/* A list to maintain subtransactions, if any. */ +static List *subxactlist = NIL; + +static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); +static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); +static PartialFileSetState pa_get_fileset_state(void); + +/* + * Returns true if it is OK to start a parallel apply worker, false otherwise. + */ +static bool +pa_can_start(void) +{ + /* Only leader apply workers can start parallel apply workers. */ + if (!am_leader_apply_worker()) + return false; + + /* + * It is good to check for any change in the subscription parameter to + * avoid the case where for a very long time the change doesn't get + * reflected. This can happen when there is a constant flow of streaming + * transactions that are handled by parallel apply workers. + * + * It is better to do it before the below checks so that the latest values + * of subscription can be used for the checks. + */ + maybe_reread_subscription(); + + /* + * Don't start a new parallel apply worker if the subscription is not + * using parallel streaming mode, or if the publisher does not support + * parallel apply. + */ + if (!MyLogicalRepWorker->parallel_apply) + return false; + + /* + * Don't start a new parallel worker if user has set skiplsn as it's + * possible that they want to skip the streaming transaction. For + * streaming transactions, we need to serialize the transaction to a file + * so that we can get the last LSN of the transaction to judge whether to + * skip before starting to apply the change. + * + * One might think that we could allow parallelism if the first lsn of the + * transaction is greater than skiplsn, but we don't send it with the + * STREAM START message, and it doesn't seem worth sending the extra eight + * bytes with the STREAM START to enable parallelism for this case. + */ + if (!XLogRecPtrIsInvalid(MySubscription->skiplsn)) + return false; + + /* + * For streaming transactions that are being applied using a parallel + * apply worker, we cannot decide whether to apply the change for a + * relation that is not in the READY state (see + * should_apply_changes_for_rel) as we won't know remote_final_lsn by that + * time. So, we don't start the new parallel apply worker in this case. + */ + if (!AllTablesyncsReady()) + return false; + + return true; +} + +/* + * Set up a dynamic shared memory segment. + * + * We set up a control region that contains a fixed-size worker info + * (ParallelApplyWorkerShared), a message queue, and an error queue. + * + * Returns true on success, false on failure. + */ +static bool +pa_setup_dsm(ParallelApplyWorkerInfo *winfo) +{ + shm_toc_estimator e; + Size segsize; + dsm_segment *seg; + shm_toc *toc; + ParallelApplyWorkerShared *shared; + shm_mq *mq; + Size queue_size = DSM_QUEUE_SIZE; + Size error_queue_size = DSM_ERROR_QUEUE_SIZE; + + /* + * Estimate how much shared memory we need. + * + * Because the TOC machinery may choose to insert padding of oddly-sized + * requests, we must estimate each chunk separately. + * + * We need one key to register the location of the header, and two other + * keys to track the locations of the message queue and the error message + * queue. + */ + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared)); + shm_toc_estimate_chunk(&e, queue_size); + shm_toc_estimate_chunk(&e, error_queue_size); + + shm_toc_estimate_keys(&e, 3); + segsize = shm_toc_estimate(&e); + + /* Create the shared memory segment and establish a table of contents. */ + seg = dsm_create(shm_toc_estimate(&e), 0); + if (!seg) + return false; + + toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg), + segsize); + + /* Set up the header region. */ + shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared)); + SpinLockInit(&shared->mutex); + + shared->xact_state = PARALLEL_TRANS_UNKNOWN; + pg_atomic_init_u32(&(shared->pending_stream_count), 0); + shared->last_commit_end = InvalidXLogRecPtr; + shared->fileset_state = FS_EMPTY; + + shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared); + + /* Set up message queue for the worker. */ + mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size); + shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq); + shm_mq_set_sender(mq, MyProc); + + /* Attach the queue. */ + winfo->mq_handle = shm_mq_attach(mq, seg, NULL); + + /* Set up error queue for the worker. */ + mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size), + error_queue_size); + shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq); + shm_mq_set_receiver(mq, MyProc); + + /* Attach the queue. */ + winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL); + + /* Return results to caller. */ + winfo->dsm_seg = seg; + winfo->shared = shared; + + return true; +} + +/* + * Try to get a parallel apply worker from the pool. If none is available then + * start a new one. + */ +static ParallelApplyWorkerInfo * +pa_launch_parallel_worker(void) +{ + MemoryContext oldcontext; + bool launched; + ParallelApplyWorkerInfo *winfo; + ListCell *lc; + + /* Try to get an available parallel apply worker from the worker pool. */ + foreach(lc, ParallelApplyWorkerPool) + { + winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + if (!winfo->in_use) + return winfo; + } + + /* + * Start a new parallel apply worker. + * + * The worker info can be used for the lifetime of the worker process, so + * create it in a permanent context. + */ + oldcontext = MemoryContextSwitchTo(ApplyContext); + + winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo)); + + /* Setup shared memory. */ + if (!pa_setup_dsm(winfo)) + { + MemoryContextSwitchTo(oldcontext); + pfree(winfo); + return NULL; + } + + launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + InvalidOid, + dsm_segment_handle(winfo->dsm_seg)); + + if (launched) + { + ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo); + } + else + { + pa_free_worker_info(winfo); + winfo = NULL; + } + + MemoryContextSwitchTo(oldcontext); + + return winfo; +} + +/* + * Allocate a parallel apply worker that will be used for the specified xid. + * + * We first try to get an available worker from the pool, if any and then try + * to launch a new worker. On successful allocation, remember the worker + * information in the hash table so that we can get it later for processing the + * streaming changes. + */ +void +pa_allocate_worker(TransactionId xid) +{ + bool found; + ParallelApplyWorkerInfo *winfo = NULL; + ParallelApplyWorkerEntry *entry; + + if (!pa_can_start()) + return; + + /* First time through, initialize parallel apply worker state hashtable. */ + if (!ParallelApplyTxnHash) + { + HASHCTL ctl; + + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(TransactionId); + ctl.entrysize = sizeof(ParallelApplyWorkerEntry); + ctl.hcxt = ApplyContext; + + ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash", + 16, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + winfo = pa_launch_parallel_worker(); + if (!winfo) + return; + + /* Create an entry for the requested transaction. */ + entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found); + if (found) + elog(ERROR, "hash table corrupted"); + + /* Update the transaction information in shared memory. */ + SpinLockAcquire(&winfo->shared->mutex); + winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; + winfo->shared->xid = xid; + SpinLockRelease(&winfo->shared->mutex); + + winfo->in_use = true; + winfo->serialize_changes = false; + entry->winfo = winfo; + entry->xid = xid; +} + +/* + * Find the assigned worker for the given transaction, if any. + */ +ParallelApplyWorkerInfo * +pa_find_worker(TransactionId xid) +{ + bool found; + ParallelApplyWorkerEntry *entry; + + if (!TransactionIdIsValid(xid)) + return NULL; + + if (!ParallelApplyTxnHash) + return NULL; + + /* Return the cached parallel apply worker if valid. */ + if (stream_apply_worker) + return stream_apply_worker; + + /* Find an entry for the requested transaction. */ + entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found); + if (found) + { + /* The worker must not have exited. */ + Assert(entry->winfo->in_use); + return entry->winfo; + } + + return NULL; +} + +/* + * Makes the worker available for reuse. + * + * This removes the parallel apply worker entry from the hash table so that it + * can't be used. If there are enough workers in the pool, it stops the worker + * and frees the corresponding info. Otherwise it just marks the worker as + * available for reuse. + * + * For more information about the worker pool, see comments atop this file. + */ +static void +pa_free_worker(ParallelApplyWorkerInfo *winfo) +{ + Assert(!am_parallel_apply_worker()); + Assert(winfo->in_use); + Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); + + if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL)) + elog(ERROR, "hash table corrupted"); + + /* + * Stop the worker if there are enough workers in the pool. + * + * XXX Additionally, we also stop the worker if the leader apply worker + * serialize part of the transaction data due to a send timeout. This is + * because the message could be partially written to the queue and there + * is no way to clean the queue other than resending the message until it + * succeeds. Instead of trying to send the data which anyway would have + * been serialized and then letting the parallel apply worker deal with + * the spurious message, we stop the worker. + */ + if (winfo->serialize_changes || + list_length(ParallelApplyWorkerPool) > + (max_parallel_apply_workers_per_subscription / 2)) + { + int slot_no; + uint16 generation; + + SpinLockAcquire(&winfo->shared->mutex); + generation = winfo->shared->logicalrep_worker_generation; + slot_no = winfo->shared->logicalrep_worker_slot_no; + SpinLockRelease(&winfo->shared->mutex); + + logicalrep_pa_worker_stop(slot_no, generation); + + pa_free_worker_info(winfo); + + return; + } + + winfo->in_use = false; + winfo->serialize_changes = false; +} + +/* + * Free the parallel apply worker information and unlink the files with + * serialized changes if any. + */ +static void +pa_free_worker_info(ParallelApplyWorkerInfo *winfo) +{ + Assert(winfo); + + if (winfo->mq_handle) + shm_mq_detach(winfo->mq_handle); + + if (winfo->error_mq_handle) + shm_mq_detach(winfo->error_mq_handle); + + /* Unlink the files with serialized changes. */ + if (winfo->serialize_changes) + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); + + if (winfo->dsm_seg) + dsm_detach(winfo->dsm_seg); + + /* Remove from the worker pool. */ + ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo); + + pfree(winfo); +} + +/* + * Detach the error queue for all parallel apply workers. + */ +void +pa_detach_all_error_mq(void) +{ + ListCell *lc; + + foreach(lc, ParallelApplyWorkerPool) + { + ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } +} + +/* + * Check if there are any pending spooled messages. + */ +static bool +pa_has_spooled_message_pending() +{ + PartialFileSetState fileset_state; + + fileset_state = pa_get_fileset_state(); + + return (fileset_state != FS_EMPTY); +} + +/* + * Replay the spooled messages once the leader apply worker has finished + * serializing changes to the file. + * + * Returns false if there aren't any pending spooled messages, true otherwise. + */ +static bool +pa_process_spooled_messages_if_required(void) +{ + PartialFileSetState fileset_state; + + fileset_state = pa_get_fileset_state(); + + if (fileset_state == FS_EMPTY) + return false; + + /* + * If the leader apply worker is busy serializing the partial changes then + * acquire the stream lock now and wait for the leader worker to finish + * serializing the changes. Otherwise, the parallel apply worker won't get + * a chance to receive a STREAM_STOP (and acquire the stream lock) until + * the leader had serialized all changes which can lead to undetected + * deadlock. + * + * Note that the fileset state can be FS_SERIALIZE_DONE once the leader + * worker has finished serializing the changes. + */ + if (fileset_state == FS_SERIALIZE_IN_PROGRESS) + { + pa_lock_stream(MyParallelShared->xid, AccessShareLock); + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); + + fileset_state = pa_get_fileset_state(); + } + + /* + * We cannot read the file immediately after the leader has serialized all + * changes to the file because there may still be messages in the memory + * queue. We will apply all spooled messages the next time we call this + * function and that will ensure there are no messages left in the memory + * queue. + */ + if (fileset_state == FS_SERIALIZE_DONE) + { + pa_set_fileset_state(MyParallelShared, FS_READY); + } + else if (fileset_state == FS_READY) + { + apply_spooled_messages(&MyParallelShared->fileset, + MyParallelShared->xid, + InvalidXLogRecPtr); + pa_set_fileset_state(MyParallelShared, FS_EMPTY); + } + + return true; +} + +/* + * Interrupt handler for main loop of parallel apply worker. + */ +static void +ProcessParallelApplyInterrupts(void) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished", + MySubscription->name))); + + proc_exit(0); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } +} + +/* Parallel apply worker main loop. */ +static void +LogicalParallelApplyLoop(shm_mq_handle *mqh) +{ + shm_mq_result shmq_res; + ErrorContextCallback errcallback; + MemoryContext oldcxt = CurrentMemoryContext; + + /* + * Init the ApplyMessageContext which we clean up after each replication + * protocol message. + */ + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); + + /* + * Push apply error context callback. Fields will be filled while applying + * a change. + */ + errcallback.callback = apply_error_callback; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + for (;;) + { + void *data; + Size len; + + ProcessParallelApplyInterrupts(); + + /* Ensure we are reading the data into our memory context. */ + MemoryContextSwitchTo(ApplyMessageContext); + + shmq_res = shm_mq_receive(mqh, &len, &data, true); + + if (shmq_res == SHM_MQ_SUCCESS) + { + StringInfoData s; + int c; + + if (len == 0) + elog(ERROR, "invalid message length"); + + s.cursor = 0; + s.maxlen = -1; + s.data = (char *) data; + s.len = len; + + /* + * The first byte of messages sent from leader apply worker to + * parallel apply workers can only be 'w'. + */ + c = pq_getmsgbyte(&s); + if (c != 'w') + elog(ERROR, "unexpected message \"%c\"", c); + + /* + * Ignore statistics fields that have been updated by the leader + * apply worker. + * + * XXX We can avoid sending the statistics fields from the leader + * apply worker but for that, it needs to rebuild the entire + * message by removing these fields which could be more work than + * simply ignoring these fields in the parallel apply worker. + */ + s.cursor += SIZE_STATS_MESSAGE; + + apply_dispatch(&s); + } + else if (shmq_res == SHM_MQ_WOULD_BLOCK) + { + /* Replay the changes from the file, if any. */ + if (!pa_process_spooled_messages_if_required()) + { + int rc; + + /* Wait for more work. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + } + else + { + Assert(shmq_res == SHM_MQ_DETACHED); + + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to the logical replication apply worker"))); + } + + MemoryContextReset(ApplyMessageContext); + MemoryContextSwitchTo(oldcxt); + } + + /* Pop the error context stack. */ + error_context_stack = errcallback.previous; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Make sure the leader apply worker tries to read from our error queue one more + * time. This guards against the case where we exit uncleanly without sending + * an ErrorResponse, for example because some code calls proc_exit directly. + */ +static void +pa_shutdown(int code, Datum arg) +{ + SendProcSignal(MyLogicalRepWorker->apply_leader_pid, + PROCSIG_PARALLEL_APPLY_MESSAGE, + InvalidBackendId); + + dsm_detach((dsm_segment *) DatumGetPointer(arg)); +} + +/* + * Parallel apply worker entry point. + */ +void +ParallelApplyWorkerMain(Datum main_arg) +{ + ParallelApplyWorkerShared *shared; + dsm_handle handle; + dsm_segment *seg; + shm_toc *toc; + shm_mq *mq; + shm_mq_handle *mqh; + shm_mq_handle *error_mqh; + RepOriginId originid; + int worker_slot = DatumGetInt32(main_arg); + char originname[NAMEDATALEN]; + + /* Setup signal handling. */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Attach to the dynamic shared memory segment for the parallel apply, and + * find its table of contents. + * + * Like parallel query, we don't need resource owner by this time. See + * ParallelWorkerMain. + */ + memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle)); + seg = dsm_attach(handle); + if (!seg) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + + toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg)); + if (!toc) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); + + /* Look up the shared information. */ + shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false); + MyParallelShared = shared; + + /* + * Attach to the message queue. + */ + mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false); + shm_mq_set_receiver(mq, MyProc); + mqh = shm_mq_attach(mq, seg, NULL); + + /* + * Primary initialization is complete. Now, we can attach to our slot. + * This is to ensure that the leader apply worker does not write data to + * the uninitialized memory queue. + */ + logicalrep_worker_attach(worker_slot); + + SpinLockAcquire(&MyParallelShared->mutex); + MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; + MyParallelShared->logicalrep_worker_slot_no = worker_slot; + SpinLockRelease(&MyParallelShared->mutex); + + /* + * Attach to the error queue. + */ + mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false); + shm_mq_set_sender(mq, MyProc); + error_mqh = shm_mq_attach(mq, seg, NULL); + + pq_redirect_to_shm_mq(seg, error_mqh); + pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid, + InvalidBackendId); + + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = + MyLogicalRepWorker->reply_time = 0; + + InitializeApplyWorker(); + + /* Setup replication origin tracking. */ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); + originid = replorigin_by_name(originname, false); + + /* + * The parallel apply worker doesn't need to monopolize this replication + * origin which was already acquired by its leader process. + */ + replorigin_session_setup(originid, MyLogicalRepWorker->apply_leader_pid); + replorigin_session_origin = originid; + CommitTransactionCommand(); + + /* + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. + */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + invalidate_syncing_table_states, + (Datum) 0); + + set_apply_error_context_origin(originname); + + LogicalParallelApplyLoop(mqh); + + /* + * The parallel apply worker must not get here because the parallel apply + * worker will only stop when it receives a SIGTERM or SIGINT from the + * leader, or when there is an error. None of these cases will allow the + * code to reach here. + */ + Assert(false); +} + +/* + * Handle receipt of an interrupt indicating a parallel apply worker message. + * + * Note: this is called within a signal handler! All we can do is set a flag + * that will cause the next CHECK_FOR_INTERRUPTS() to invoke + * HandleParallelApplyMessages(). + */ +void +HandleParallelApplyMessageInterrupt(void) +{ + InterruptPending = true; + ParallelApplyMessagePending = true; + SetLatch(MyLatch); +} + +/* + * Handle a single protocol message received from a single parallel apply + * worker. + */ +static void +HandleParallelApplyMessage(StringInfo msg) +{ + char msgtype; + + msgtype = pq_getmsgbyte(msg); + + switch (msgtype) + { + case 'E': /* ErrorResponse */ + { + ErrorData edata; + + /* Parse ErrorResponse. */ + pq_parse_errornotice(msg, &edata); + + /* + * If desired, add a context line to show that this is a + * message propagated from a parallel apply worker. Otherwise, + * it can sometimes be confusing to understand what actually + * happened. + */ + if (edata.context) + edata.context = psprintf("%s\n%s", edata.context, + _("logical replication parallel apply worker")); + else + edata.context = pstrdup(_("logical replication parallel apply worker")); + + /* + * Context beyond that should use the error context callbacks + * that were in effect in LogicalRepApplyLoop(). + */ + error_context_stack = apply_error_context_stack; + + /* + * The actual error must have been reported by the parallel + * apply worker. + */ + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication parallel apply worker exited due to error"), + errcontext("%s", edata.context))); + } + + /* + * Don't need to do anything about NoticeResponse and + * NotifyResponse as the logical replication worker doesn't need + * to send messages to the client. + */ + case 'N': + case 'A': + break; + + default: + elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)", + msgtype, msg->len); + } +} + +/* + * Handle any queued protocol messages received from parallel apply workers. + */ +void +HandleParallelApplyMessages(void) +{ + ListCell *lc; + MemoryContext oldcontext; + + static MemoryContext hpam_context = NULL; + + /* + * This is invoked from ProcessInterrupts(), and since some of the + * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential + * for recursive calls if more signals are received while this runs. It's + * unclear that recursive entry would be safe, and it doesn't seem useful + * even if it is safe, so let's block interrupts until done. + */ + HOLD_INTERRUPTS(); + + /* + * Moreover, CurrentMemoryContext might be pointing almost anywhere. We + * don't want to risk leaking data into long-lived contexts, so let's do + * our work here in a private context that we can reset on each use. + */ + if (!hpam_context) /* first time through? */ + hpam_context = AllocSetContextCreate(TopMemoryContext, + "HandleParallelApplyMessages", + ALLOCSET_DEFAULT_SIZES); + else + MemoryContextReset(hpam_context); + + oldcontext = MemoryContextSwitchTo(hpam_context); + + ParallelApplyMessagePending = false; + + foreach(lc, ParallelApplyWorkerPool) + { + shm_mq_result res; + Size nbytes; + void *data; + ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + /* + * The leader will detach from the error queue and set it to NULL + * before preparing to stop all parallel apply workers, so we don't + * need to handle error messages anymore. See + * logicalrep_worker_detach. + */ + if (!winfo->error_mq_handle) + continue; + + res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true); + + if (res == SHM_MQ_WOULD_BLOCK) + continue; + else if (res == SHM_MQ_SUCCESS) + { + StringInfoData msg; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + HandleParallelApplyMessage(&msg); + pfree(msg.data); + } + else + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to the logical replication parallel apply worker"))); + } + + MemoryContextSwitchTo(oldcontext); + + /* Might as well clear the context on our way out */ + MemoryContextReset(hpam_context); + + RESUME_INTERRUPTS(); +} + +/* + * Send the data to the specified parallel apply worker via shared-memory + * queue. + * + * Returns false if the attempt to send data via shared memory times out, true + * otherwise. + */ +bool +pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) +{ + int rc; + shm_mq_result result; + TimestampTz startTime = 0; + + Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); + +/* + * This timeout is a bit arbitrary but testing revealed that it is sufficient + * to send the message unless the parallel apply worker is waiting on some + * lock or there is a serious resource crunch. See the comments atop this file + * to know why we are using a non-blocking way to send the message. + */ +#define SHM_SEND_RETRY_INTERVAL_MS 1000 +#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS) + + for (;;) + { + result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true); + + if (result == SHM_MQ_SUCCESS) + return true; + else if (result == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to shared-memory queue"))); + + Assert(result == SHM_MQ_WOULD_BLOCK); + + /* Wait before retrying. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + SHM_SEND_RETRY_INTERVAL_MS, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (startTime == 0) + startTime = GetCurrentTimestamp(); + else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), + SHM_SEND_TIMEOUT_MS)) + { + ereport(LOG, + (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", + winfo->shared->xid))); + return false; + } + } +} + +/* + * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means + * that the current data and any subsequent data for this transaction will be + * serialized to a file. This is done to prevent possible deadlocks with + * another parallel apply worker (refer to the comments atop this file). + */ +void +pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, + bool stream_locked) +{ + /* + * The parallel apply worker could be stuck for some reason (say waiting + * on some lock by other backend), so stop trying to send data directly to + * it and start serializing data to the file instead. + */ + winfo->serialize_changes = true; + + /* Initialize the stream fileset. */ + stream_start_internal(winfo->shared->xid, true); + + /* + * Acquires the stream lock if not already to make sure that the parallel + * apply worker will wait for the leader to release the stream lock until + * the end of the transaction. + */ + if (!stream_locked) + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS); +} + +/* + * Wait until the parallel apply worker's transaction state has reached or + * exceeded the given xact_state. + */ +static void +pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, + ParallelTransState xact_state) +{ + for (;;) + { + /* + * Stop if the transaction state has reached or exceeded the given + * xact_state. + */ + if (pa_get_xact_state(winfo->shared) >= xact_state) + break; + + /* Wait to be signalled. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + + /* Reset the latch so we don't spin. */ + ResetLatch(MyLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } +} + +/* + * Wait until the parallel apply worker's transaction finishes. + */ +static void +pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) +{ + /* + * Wait until the parallel apply worker set the state to + * PARALLEL_TRANS_STARTED which means it has acquired the transaction + * lock. This is to prevent leader apply worker from acquiring the + * transaction lock earlier than the parallel apply worker. + */ + pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED); + + /* + * Wait for the transaction lock to be released. This is required to + * detect deadlock among leader and parallel apply workers. Refer to the + * comments atop this file. + */ + pa_lock_transaction(winfo->shared->xid, AccessShareLock); + pa_unlock_transaction(winfo->shared->xid, AccessShareLock); + + /* + * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel + * apply worker failed while applying changes causing the lock to be + * released. + */ + if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to the logical replication parallel apply worker"))); +} + +/* + * Set the transaction state for a given parallel apply worker. + */ +void +pa_set_xact_state(ParallelApplyWorkerShared *wshared, + ParallelTransState xact_state) +{ + SpinLockAcquire(&wshared->mutex); + wshared->xact_state = xact_state; + SpinLockRelease(&wshared->mutex); +} + +/* + * Get the transaction state for a given parallel apply worker. + */ +static ParallelTransState +pa_get_xact_state(ParallelApplyWorkerShared *wshared) +{ + ParallelTransState xact_state; + + SpinLockAcquire(&wshared->mutex); + xact_state = wshared->xact_state; + SpinLockRelease(&wshared->mutex); + + return xact_state; +} + +/* + * Cache the parallel apply worker information. + */ +void +pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo) +{ + stream_apply_worker = winfo; +} + +/* + * Form a unique savepoint name for the streaming transaction. + * + * Note that different subscriptions for publications on different nodes can + * receive same remote xid, so we need to use subscription id along with it. + * + * Returns the name in the supplied buffer. + */ +static void +pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp) +{ + snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid); +} + +/* + * Define a savepoint for a subxact in parallel apply worker if needed. + * + * The parallel apply worker can figure out if a new subtransaction was + * started by checking if the new change arrived with a different xid. In that + * case define a named savepoint, so that we are able to rollback to it + * if required. + */ +void +pa_start_subtrans(TransactionId current_xid, TransactionId top_xid) +{ + if (current_xid != top_xid && + !list_member_xid(subxactlist, current_xid)) + { + MemoryContext oldctx; + char spname[NAMEDATALEN]; + + pa_savepoint_name(MySubscription->oid, current_xid, + spname, sizeof(spname)); + + elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname); + + /* We must be in transaction block to define the SAVEPOINT. */ + if (!IsTransactionBlock()) + { + if (!IsTransactionState()) + StartTransactionCommand(); + + BeginTransactionBlock(); + CommitTransactionCommand(); + } + + DefineSavepoint(spname); + + /* + * CommitTransactionCommand is needed to start a subtransaction after + * issuing a SAVEPOINT inside a transaction block (see + * StartSubTransaction()). + */ + CommitTransactionCommand(); + + oldctx = MemoryContextSwitchTo(TopTransactionContext); + subxactlist = lappend_xid(subxactlist, current_xid); + MemoryContextSwitchTo(oldctx); + } +} + +/* Reset the list that maintains subtransactions. */ +void +pa_reset_subtrans(void) +{ + /* + * We don't need to free this explicitly as the allocated memory will be + * freed at the transaction end. + */ + subxactlist = NIL; +} + +/* + * Handle STREAM ABORT message when the transaction was applied in a parallel + * apply worker. + */ +void +pa_stream_abort(LogicalRepStreamAbortData *abort_data) +{ + TransactionId xid = abort_data->xid; + TransactionId subxid = abort_data->subxid; + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = abort_data->abort_lsn; + replorigin_session_origin_timestamp = abort_data->abort_time; + + /* + * If the two XIDs are the same, it's in fact abort of toplevel xact, so + * just free the subxactlist. + */ + if (subxid == xid) + { + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); + + /* + * Release the lock as we might be processing an empty streaming + * transaction in which case the lock won't be released during + * transaction rollback. + * + * Note that it's ok to release the transaction lock before aborting + * the transaction because even if the parallel apply worker dies due + * to crash or some other reason, such a transaction would still be + * considered aborted. + */ + pa_unlock_transaction(xid, AccessExclusiveLock); + + AbortCurrentTransaction(); + + if (IsTransactionBlock()) + { + EndTransactionBlock(false); + CommitTransactionCommand(); + } + + pa_reset_subtrans(); + + pgstat_report_activity(STATE_IDLE, NULL); + } + else + { + /* OK, so it's a subxact. Rollback to the savepoint. */ + int i; + char spname[NAMEDATALEN]; + + pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname)); + + elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname); + + /* + * Search the subxactlist, determine the offset tracked for the + * subxact, and truncate the list. + * + * Note that for an empty sub-transaction we won't find the subxid + * here. + */ + for (i = list_length(subxactlist) - 1; i >= 0; i--) + { + TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i)); + + if (xid_tmp == subxid) + { + RollbackToSavepoint(spname); + CommitTransactionCommand(); + subxactlist = list_truncate(subxactlist, i); + break; + } + } + } +} + +/* + * Set the fileset state for a particular parallel apply worker. The fileset + * will be set once the leader worker serialized all changes to the file + * so that it can be used by parallel apply worker. + */ +void +pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state) +{ + SpinLockAcquire(&wshared->mutex); + wshared->fileset_state = fileset_state; + + if (fileset_state == FS_SERIALIZE_DONE) + { + Assert(am_leader_apply_worker()); + Assert(MyLogicalRepWorker->stream_fileset); + wshared->fileset = *MyLogicalRepWorker->stream_fileset; + } + + SpinLockRelease(&wshared->mutex); +} + +/* + * Get the fileset state for the current parallel apply worker. + */ +static PartialFileSetState +pa_get_fileset_state(void) +{ + PartialFileSetState fileset_state; + + Assert(am_parallel_apply_worker()); + + SpinLockAcquire(&MyParallelShared->mutex); + fileset_state = MyParallelShared->fileset_state; + SpinLockRelease(&MyParallelShared->mutex); + + return fileset_state; +} + +/* + * Helper functions to acquire and release a lock for each stream block. + * + * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a + * stream lock. + * + * Refer to the comments atop this file to see how the stream lock is used. + */ +void +pa_lock_stream(TransactionId xid, LOCKMODE lockmode) +{ + LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, + PARALLEL_APPLY_LOCK_STREAM, lockmode); +} + +void +pa_unlock_stream(TransactionId xid, LOCKMODE lockmode) +{ + UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, + PARALLEL_APPLY_LOCK_STREAM, lockmode); +} + +/* + * Helper functions to acquire and release a lock for each local transaction + * apply. + * + * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a + * transaction lock. + * + * Note that all the callers must pass a remote transaction ID instead of a + * local transaction ID as xid. This is because the local transaction ID will + * only be assigned while applying the first change in the parallel apply but + * it's possible that the first change in the parallel apply worker is blocked + * by a concurrently executing transaction in another parallel apply worker. We + * can only communicate the local transaction id to the leader after applying + * the first change so it won't be able to wait after sending the xact finish + * command using this lock. + * + * Refer to the comments atop this file to see how the transaction lock is + * used. + */ +void +pa_lock_transaction(TransactionId xid, LOCKMODE lockmode) +{ + LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, + PARALLEL_APPLY_LOCK_XACT, lockmode); +} + +void +pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode) +{ + UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, + PARALLEL_APPLY_LOCK_XACT, lockmode); +} + +/* + * Decrement the number of pending streaming blocks and wait on the stream lock + * if there is no pending block available. + */ +void +pa_decr_and_wait_stream_block(void) +{ + Assert(am_parallel_apply_worker()); + + /* + * It is only possible to not have any pending stream chunks when we are + * applying spooled messages. + */ + if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0) + { + if (pa_has_spooled_message_pending()) + return; + + elog(ERROR, "invalid pending streaming chunk 0"); + } + + if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0) + { + pa_lock_stream(MyParallelShared->xid, AccessShareLock); + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); + } +} + +/* + * Finish processing the streaming transaction in the leader apply worker. + */ +void +pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) +{ + Assert(am_leader_apply_worker()); + + /* + * Unlock the shared object lock so that parallel apply worker can + * continue to receive and apply changes. + */ + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); + + /* + * Wait for that worker to finish. This is necessary to maintain commit + * order which avoids failures due to transaction dependencies and + * deadlocks. + */ + pa_wait_for_xact_finish(winfo); + + if (!XLogRecPtrIsInvalid(remote_lsn)) + store_flush_position(remote_lsn, winfo->shared->last_commit_end); + + pa_free_worker(winfo); +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c3bc8ecc926..a53e23c679d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -822,10 +822,11 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, for (i = 0; i < parsed->nsubxacts; i++) { ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], - buf->record->EndRecPtr); + buf->record->EndRecPtr, abort_time); } - ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr, + abort_time); } /* update the decoding stats */ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index a69e371c050..afb7acddaa6 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -55,6 +55,7 @@ /* GUC variables */ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; +int max_parallel_apply_workers_per_subscription = 2; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -74,6 +75,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static int logicalrep_pa_worker_count(Oid subid); static bool on_commit_launcher_wakeup = false; @@ -152,8 +154,10 @@ get_subscription_list(void) * * This is only needed for cleaning up the shared memory in case the worker * fails to attach. + * + * Returns whether the attach was successful. */ -static void +static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle) @@ -169,11 +173,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - /* Worker either died or has started; no need to do anything. */ + /* Worker either died or has started. Return false if died. */ if (!worker->in_use || worker->proc) { LWLockRelease(LogicalRepWorkerLock); - return; + return worker->in_use; } LWLockRelease(LogicalRepWorkerLock); @@ -188,7 +192,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, if (generation == worker->generation) logicalrep_worker_cleanup(worker); LWLockRelease(LogicalRepWorkerLock); - return; + return false; } /* @@ -210,6 +214,8 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given * subscription id and relid. + * + * We are only interested in the leader apply worker or table sync worker. */ LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running) @@ -224,6 +230,10 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + /* Skip parallel apply workers. */ + if (isParallelApplyWorker(w)) + continue; + if (w->in_use && w->subid == subid && w->relid == relid && (!only_running || w->proc)) { @@ -260,11 +270,13 @@ logicalrep_workers_find(Oid subid, bool only_running) } /* - * Start new apply background worker, if possible. + * Start new logical replication background worker, if possible. + * + * Returns true on success, false on failure. */ -void +bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid) + Oid relid, dsm_handle subworker_dsm) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -273,7 +285,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, int slot = 0; LogicalRepWorker *worker = NULL; int nsyncworkers; + int nparallelapplyworkers; TimestampTz now; + bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID); + + /* Sanity check - tablesync worker cannot be a subworker */ + Assert(!(is_parallel_apply_worker && OidIsValid(relid))); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -351,7 +368,20 @@ retry: if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); - return; + return false; + } + + nparallelapplyworkers = logicalrep_pa_worker_count(subid); + + /* + * Return false if the number of parallel apply workers reached the limit + * per subscription. + */ + if (is_parallel_apply_worker && + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) + { + LWLockRelease(LogicalRepWorkerLock); + return false; } /* @@ -365,7 +395,7 @@ retry: (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of logical replication worker slots"), errhint("You might need to increase max_logical_replication_workers."))); - return; + return false; } /* Prepare the worker slot. */ @@ -380,6 +410,8 @@ retry: worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; worker->stream_fileset = NULL; + worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; + worker->parallel_apply = is_parallel_apply_worker; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -397,19 +429,34 @@ retry: BGWORKER_BACKEND_DATABASE_CONNECTION; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + + if (is_parallel_apply_worker) + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); + else + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u sync %u", subid, relid); + else if (is_parallel_apply_worker) + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication parallel apply worker for subscription %u", subid); else snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u", subid); - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); + "logical replication apply worker for subscription %u", subid); + + if (is_parallel_apply_worker) + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); + else + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; bgw.bgw_main_arg = Int32GetDatum(slot); + if (is_parallel_apply_worker) + memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { /* Failed to start worker, so clean up the worker slot. */ @@ -422,33 +469,23 @@ retry: (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("out of background worker slots"), errhint("You might need to increase max_worker_processes."))); - return; + return false; } /* Now wait until it attaches. */ - WaitForReplicationWorkerAttach(worker, generation, bgw_handle); + return WaitForReplicationWorkerAttach(worker, generation, bgw_handle); } /* - * Stop the logical replication worker for subid/relid, if any, and wait until - * it detaches from the slot. + * Internal function to stop the worker and wait until it detaches from the + * slot. */ -void -logicalrep_worker_stop(Oid subid, Oid relid) +static void +logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) { - LogicalRepWorker *worker; uint16 generation; - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - - worker = logicalrep_worker_find(subid, relid, false); - - /* No worker, nothing to do. */ - if (!worker) - { - LWLockRelease(LogicalRepWorkerLock); - return; - } + Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED)); /* * Remember which generation was our worker so we can check if what we see @@ -486,10 +523,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) * different, meaning that a different worker has taken the slot. */ if (!worker->in_use || worker->generation != generation) - { - LWLockRelease(LogicalRepWorkerLock); return; - } /* Worker has assigned proc, so it has started. */ if (worker->proc) @@ -497,7 +531,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* Now terminate the worker ... */ - kill(worker->proc->pid, SIGTERM); + kill(worker->proc->pid, signo); /* ... and wait for it to die. */ for (;;) @@ -523,6 +557,53 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); } +} + +/* + * Stop the logical replication worker for subid/relid, if any. + */ +void +logicalrep_worker_stop(Oid subid, Oid relid) +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(subid, relid, false); + + if (worker) + { + Assert(!isParallelApplyWorker(worker)); + logicalrep_worker_stop_internal(worker, SIGTERM); + } + + LWLockRelease(LogicalRepWorkerLock); +} + +/* + * Stop the logical replication parallel apply worker corresponding to the + * input slot number. + * + * Node that the function sends SIGINT instead of SIGTERM to the parallel apply + * worker so that the worker exits cleanly. + */ +void +logicalrep_pa_worker_stop(int slot_no, uint16 generation) +{ + LogicalRepWorker *worker; + + Assert(slot_no >= 0 && slot_no < max_logical_replication_workers); + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = &LogicalRepCtx->workers[slot_no]; + Assert(isParallelApplyWorker(worker)); + + /* + * Only stop the worker if the generation matches and the worker is alive. + */ + if (worker->generation == generation && worker->proc) + logicalrep_worker_stop_internal(worker, SIGINT); LWLockRelease(LogicalRepWorkerLock); } @@ -595,11 +676,40 @@ logicalrep_worker_attach(int slot) } /* - * Detach the worker (cleans up the worker info). + * Stop the parallel apply workers if any, and detach the leader apply worker + * (cleans up the worker info). */ static void logicalrep_worker_detach(void) { + /* Stop the parallel apply workers. */ + if (am_leader_apply_worker()) + { + List *workers; + ListCell *lc; + + /* + * Detach from the error_mq_handle for all parallel apply workers + * before terminating them. This prevents the leader apply worker from + * receiving the worker termination message and sending it to logs + * when the same is already done by the parallel worker. + */ + pa_detach_all_error_mq(); + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true); + foreach(lc, workers) + { + LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); + + if (isParallelApplyWorker(w)) + logicalrep_worker_stop_internal(w, SIGTERM); + } + + LWLockRelease(LogicalRepWorkerLock); + } + /* Block concurrent access. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); @@ -622,6 +732,8 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) worker->userid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; + worker->apply_leader_pid = InvalidPid; + worker->parallel_apply = false; } /* @@ -653,6 +765,13 @@ logicalrep_worker_onexit(int code, Datum arg) if (MyLogicalRepWorker->stream_fileset != NULL) FileSetDeleteAll(MyLogicalRepWorker->stream_fileset); + /* + * Session level locks may be acquired outside of a transaction in + * parallel apply mode and will not be released when the worker + * terminates, so manually release all locks before the worker exits. + */ + LockReleaseAll(DEFAULT_LOCKMETHOD, true); + ApplyLauncherWakeup(); } @@ -681,6 +800,33 @@ logicalrep_sync_worker_count(Oid subid) } /* + * Count the number of registered (but not necessarily running) parallel apply + * workers for a subscription. + */ +static int +logicalrep_pa_worker_count(Oid subid) +{ + int i; + int res = 0; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* + * Scan all attached parallel apply workers, only counting those which + * have the given subscription id. + */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->subid == subid && isParallelApplyWorker(w)) + res++; + } + + return res; +} + +/* * ApplyLauncherShmemSize * Compute space needed for replication launcher shared memory */ @@ -869,7 +1015,7 @@ ApplyLauncherMain(Datum main_arg) wait_time = wal_retrieve_retry_interval; logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); + sub->owner, InvalidOid, DSM_HANDLE_INVALID); } } @@ -952,6 +1098,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) if (OidIsValid(subid) && worker.subid != subid) continue; + /* Skip if this is a parallel apply worker */ + if (isParallelApplyWorker(&worker)) + continue; + worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 7144d7a8a42..d48cd4c5901 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -1,6 +1,7 @@ # Copyright (c) 2022-2023, PostgreSQL Global Development Group backend_sources += files( + 'applyparallelworker.c', 'decode.c', 'launcher.c', 'logical.c', diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index fc86768ed64..b754c43840f 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg) * array doesn't have to be searched when calling * replorigin_session_advance(). * - * Obviously only one such cached origin can exist per process and the current - * cached value can only be set again after the previous value is torn down - * with replorigin_session_reset(). + * Normally only one such cached origin can exist per process so the cached + * value can only be set again after the previous value is torn down with + * replorigin_session_reset(). For this normal case pass acquired_by = 0 + * (meaning the slot is not allowed to be already acquired by another process). + * + * However, sometimes multiple processes can safely re-use the same origin slot + * (for example, multiple parallel apply processes can safely use the same + * origin, provided they maintain commit order by allowing only one process to + * commit at a time). For this case the first process must pass acquired_by = + * 0, and then the other processes sharing that same origin can pass + * acquired_by = PID of the first process. */ void -replorigin_session_setup(RepOriginId node) +replorigin_session_setup(RepOriginId node, int acquired_by) { static bool registered_cleanup; int i; @@ -1122,7 +1130,7 @@ replorigin_session_setup(RepOriginId node) if (curstate->roident != node) continue; - else if (curstate->acquired_by != 0) + else if (curstate->acquired_by != 0 && acquired_by == 0) { ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), @@ -1153,7 +1161,11 @@ replorigin_session_setup(RepOriginId node) Assert(session_replication_state->roident != InvalidRepOriginId); - session_replication_state->acquired_by = MyProcPid; + if (acquired_by == 0) + session_replication_state->acquired_by = MyProcPid; + else if (session_replication_state->acquired_by != acquired_by) + elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d", + node, acquired_by); LWLockRelease(ReplicationOriginLock); @@ -1337,7 +1349,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); origin = replorigin_by_name(name, false); - replorigin_session_setup(origin); + replorigin_session_setup(origin, 0); replorigin_session_origin = origin; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index d00e90dd5e0..3a9d53a61ed 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -1164,10 +1164,14 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) /* * Write STREAM ABORT to the output stream. Note that xid and subxid will be * same for the top-level transaction abort. + * + * If write_abort_info is true, send the abort_lsn and abort_time fields, + * otherwise don't. */ void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, - TransactionId subxid) + TransactionId subxid, XLogRecPtr abort_lsn, + TimestampTz abort_time, bool write_abort_info) { pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); @@ -1176,19 +1180,40 @@ logicalrep_write_stream_abort(StringInfo out, TransactionId xid, /* transaction ID */ pq_sendint32(out, xid); pq_sendint32(out, subxid); + + if (write_abort_info) + { + pq_sendint64(out, abort_lsn); + pq_sendint64(out, abort_time); + } } /* * Read STREAM ABORT from the output stream. + * + * If read_abort_info is true, read the abort_lsn and abort_time fields, + * otherwise don't. */ void -logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, - TransactionId *subxid) +logicalrep_read_stream_abort(StringInfo in, + LogicalRepStreamAbortData *abort_data, + bool read_abort_info) { - Assert(xid && subxid); + Assert(abort_data); + + abort_data->xid = pq_getmsgint(in, 4); + abort_data->subxid = pq_getmsgint(in, 4); - *xid = pq_getmsgint(in, 4); - *subxid = pq_getmsgint(in, 4); + if (read_abort_info) + { + abort_data->abort_lsn = pq_getmsgint64(in); + abort_data->abort_time = pq_getmsgint64(in); + } + else + { + abort_data->abort_lsn = InvalidXLogRecPtr; + abort_data->abort_time = 0; + } } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 29a018c58af..54ee824e6c4 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2873,7 +2873,8 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, * disk. */ void -ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + TimestampTz abort_time) { ReorderBufferTXN *txn; @@ -2884,6 +2885,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) if (txn == NULL) return; + txn->xact_time.abort_time = abort_time; + /* For streamed transactions notify the remote node about the abort. */ if (rbtxn_is_streamed(txn)) { diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 09b3e8b32ac..38dfce71296 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -14,7 +14,7 @@ * The initial data synchronization is done separately for each table, * in a separate apply worker that only fetches the initial snapshot data * from the publisher and then synchronizes the position in the stream with - * the main apply worker. + * the leader apply worker. * * There are several reasons for doing the synchronization this way: * - It allows us to parallelize the initial data synchronization @@ -153,7 +153,7 @@ finish_sync_worker(void) get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); - /* Find the main apply worker and signal it. */ + /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); /* Stop gracefully */ @@ -588,7 +588,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, - rstate->relid); + rstate->relid, + DSM_HANDLE_INVALID); hentry->last_start_time = now; } } @@ -636,6 +637,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) void process_syncing_tables(XLogRecPtr current_lsn) { + /* + * Skip for parallel apply workers because they only operate on tables + * that are in a READY state. See pa_can_start() and + * should_apply_changes_for_rel(). + */ + if (am_parallel_apply_worker()) + return; + if (am_tablesync_worker()) process_syncing_tables_for_sync(current_lsn); else @@ -1254,7 +1263,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* * Here we use the slot name instead of the subscription name as the - * application_name, so that it is different from the main apply worker, + * application_name, so that it is different from the leader apply worker, * so that synchronous replication can distinguish them. */ LogRepWorkerWalRcvConn = @@ -1302,7 +1311,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * time this tablesync was launched. */ originid = replorigin_by_name(originname, false); - replorigin_session_setup(originid); + replorigin_session_setup(originid, 0); replorigin_session_origin = originid; *origin_startpos = replorigin_session_get_progress(false); @@ -1413,7 +1422,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) true /* go backward */ , true /* WAL log */ ); UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - replorigin_session_setup(originid); + replorigin_session_setup(originid, 0); replorigin_session_origin = originid; } else @@ -1468,8 +1477,8 @@ copy_table_done: SpinLockRelease(&MyLogicalRepWorker->relmutex); /* - * Finally, wait until the main apply worker tells us to catch up and then - * return to let LogicalRepApplyLoop do it. + * Finally, wait until the leader apply worker tells us to catch up and + * then return to let LogicalRepApplyLoop do it. */ wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index f8649e142c3..79cda394453 100644..100755 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -22,8 +22,12 @@ * STREAMED TRANSACTIONS * --------------------- * Streamed transactions (large transactions exceeding a memory limit on the - * upstream) are not applied immediately, but instead, the data is written - * to temporary files and then applied at once when the final commit arrives. + * upstream) are applied using one of two approaches: + * + * 1) Write to temporary files and apply when the final commit arrives + * + * This approach is used when the user has set the subscription's streaming + * option as on. * * Unlike the regular (non-streamed) case, handling streamed transactions has * to handle aborts of both the toplevel transaction and subtransactions. This @@ -50,6 +54,12 @@ * the file we desired across multiple stream-open calls for the same * transaction. * + * 2) Parallel apply workers. + * + * This approach is used when the user has set the subscription's streaming + * option as parallel. See logical/applyparallelworker.c for information about + * this approach. + * * TWO_PHASE TRANSACTIONS * ---------------------- * Two phase transactions are replayed at prepare and then committed or @@ -233,7 +243,52 @@ typedef struct ApplyErrorCallbackArg char *origin_name; } ApplyErrorCallbackArg; -static ApplyErrorCallbackArg apply_error_callback_arg = +/* + * The action to be taken for the changes in the transaction. + * + * TRANS_LEADER_APPLY: + * This action means that we are in the leader apply worker and changes of the + * transaction are applied directly by the worker. + * + * TRANS_LEADER_SERIALIZE: + * This action means that we are in the leader apply worker or table sync + * worker. Changes are written to temporary files and then applied when the + * final commit arrives. + * + * TRANS_LEADER_SEND_TO_PARALLEL: + * This action means that we are in the leader apply worker and need to send + * the changes to the parallel apply worker. + * + * TRANS_LEADER_PARTIAL_SERIALIZE: + * This action means that we are in the leader apply worker and have sent some + * changes directly to the parallel apply worker and the remaining changes are + * serialized to a file, due to timeout while sending data. The parallel apply + * worker will apply these serialized changes when the final commit arrives. + * + * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to + * serializing changes, the leader worker also needs to serialize the + * STREAM_XXX message to a file, and wait for the parallel apply worker to + * finish the transaction when processing the transaction finish command. So + * this new action was introduced to keep the code and logic clear. + * + * TRANS_PARALLEL_APPLY: + * This action means that we are in the parallel apply worker and changes of + * the transaction are applied directly by the worker. + */ +typedef enum +{ + /* The action for non-streaming transactions. */ + TRANS_LEADER_APPLY, + + /* Actions for streaming transactions. */ + TRANS_LEADER_SERIALIZE, + TRANS_LEADER_SEND_TO_PARALLEL, + TRANS_LEADER_PARTIAL_SERIALIZE, + TRANS_PARALLEL_APPLY +} TransApplyAction; + +/* errcontext tracker */ +ApplyErrorCallbackArg apply_error_callback_arg = { .command = 0, .rel = NULL, @@ -243,7 +298,9 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .origin_name = NULL, }; -static MemoryContext ApplyMessageContext = NULL; +ErrorContextCallback *apply_error_context_stack = NULL; + +MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; /* per stream context for streaming transactions */ @@ -265,16 +322,25 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; /* + * The number of changes applied by parallel apply worker during one streaming + * block. + */ +static uint32 parallel_stream_nchanges = 0; + +/* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. * Once we start skipping changes, we don't stop it until we skip all changes of * the transaction even if pg_subscription is updated and MySubscription->skiplsn - * gets changed or reset during that. Also, in streaming transaction cases, we - * don't skip receiving and spooling the changes since we decide whether or not + * gets changed or reset during that. Also, in streaming transaction cases (streaming = on), + * we don't skip receiving and spooling the changes since we decide whether or not * to skip applying the changes when starting to apply changes. The subskiplsn is * cleared after successfully skipping the transaction or applying non-empty * transaction. The latter prevents the mistakenly specified subskiplsn from - * being left. + * being left. Note that we cannot skip the streaming transactions when using + * parallel apply workers because we cannot get the finish LSN before applying + * the changes. So, we don't start parallel apply worker when finish LSN is set + * by the user. */ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) @@ -314,23 +380,16 @@ static inline void cleanup_subxact_info(void); /* * Serialize and deserialize changes for a toplevel transaction. */ -static void stream_cleanup_files(Oid subid, TransactionId xid); static void stream_open_file(Oid subid, TransactionId xid, bool first_segment); static void stream_write_change(char action, StringInfo s); +static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); -static void store_flush_position(XLogRecPtr remote_lsn); - -static void maybe_reread_subscription(void); - static void DisableSubscriptionAndExit(void); -/* prototype needed because of stream_commit */ -static void apply_dispatch(StringInfo s); - static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -354,19 +413,32 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); -/* Common streaming function to apply all the spooled messages */ -static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); - /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); static void stop_skipping_changes(void); static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); /* Functions for apply error callback */ -static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +static TransApplyAction get_transaction_apply_action(TransactionId xid, + ParallelApplyWorkerInfo **winfo); + +/* + * Return the name of the logical replication worker. + */ +static const char * +get_worker_name(void) +{ + if (am_tablesync_worker()) + return _("logical replication table synchronization worker"); + else if (am_parallel_apply_worker()) + return _("logical replication parallel apply worker"); + else + return _("logical replication apply worker"); +} + /* * Form the origin name for the subscription. * @@ -396,19 +468,43 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, * * This is mainly needed for initial relation data sync as that runs in * separate worker process running in parallel and we need some way to skip - * changes coming to the main apply worker during the sync of a table. + * changes coming to the leader apply worker during the sync of a table. * * Note we need to do smaller or equals comparison for SYNCDONE state because * it might hold position of end of initial slot consistent point WAL * record + 1 (ie start of next record) and next record can be COMMIT of * transaction we are now processing (which is what we set remote_final_lsn * to in apply_handle_begin). + * + * Note that for streaming transactions that are being applied in the parallel + * apply worker, we disallow applying changes if the target table in the + * subscription is not in the READY state, because we cannot decide whether to + * apply the change as we won't know remote_final_lsn by that time. + * + * We already checked this in pa_can_start() before assigning the + * streaming transaction to the parallel worker, but it also needs to be + * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH + * PUBLICATION in parallel, the new table can be added to pg_subscription_rel + * while applying this transaction. */ static bool should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) { if (am_tablesync_worker()) return MyLogicalRepWorker->relid == rel->localreloid; + else if (am_parallel_apply_worker()) + { + /* We don't synchronize rel's that are in unknown state. */ + if (rel->state != SUBREL_STATE_READY && + rel->state != SUBREL_STATE_UNKNOWN) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication parallel apply worker for subscription \"%s\" will stop", + MySubscription->name), + errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized."))); + + return rel->state == SUBREL_STATE_READY; + } else return (rel->state == SUBREL_STATE_READY || (rel->state == SUBREL_STATE_SYNCDONE && @@ -454,43 +550,110 @@ end_replication_step(void) } /* - * Handle streamed transactions. + * Handle streamed transactions for both the leader apply worker and the + * parallel apply workers. + * + * In the streaming case (receiving a block of the streamed transaction), for + * serialize mode, simply redirect it to a file for the proper toplevel + * transaction, and for parallel mode, the leader apply worker will send the + * changes to parallel apply workers and the parallel apply worker will define + * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE + * messages will be applied by both leader apply worker and parallel apply + * workers). * - * If in streaming mode (receiving a block of streamed transaction), we - * simply redirect it to a file for the proper toplevel transaction. + * Returns true for streamed transactions (when the change is either serialized + * to file or sent to parallel apply worker), false otherwise (regular mode or + * needs to be processed by parallel apply worker). * - * Returns true for streamed transactions, false otherwise (regular mode). + * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION + * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent + * to a parallel apply worker. */ static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) { - TransactionId xid; + TransactionId current_xid; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + StringInfoData original_msg; + + apply_action = get_transaction_apply_action(stream_xid, &winfo); /* not in streaming mode */ - if (!in_streamed_transaction) + if (apply_action == TRANS_LEADER_APPLY) return false; - Assert(stream_fd != NULL); Assert(TransactionIdIsValid(stream_xid)); /* + * The parallel apply worker needs the xid in this message to decide + * whether to define a savepoint, so save the original message that has + * not moved the cursor after the xid. We will serialize this message to a + * file in PARTIAL_SERIALIZE mode. + */ + original_msg = *s; + + /* * We should have received XID of the subxact as the first part of the * message, so extract it. */ - xid = pq_getmsgint(s, 4); + current_xid = pq_getmsgint(s, 4); - if (!TransactionIdIsValid(xid)) + if (!TransactionIdIsValid(current_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); - /* Add the new subxact to the array (unless already there). */ - subxact_info_add(xid); + switch (apply_action) + { + case TRANS_LEADER_SERIALIZE: + Assert(stream_fd); + + /* Add the new subxact to the array (unless already there). */ + subxact_info_add(current_xid); - /* write the change to the current file */ - stream_write_change(action, s); + /* Write the change to the current file */ + stream_write_change(action, s); + return true; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* + * XXX The publisher side doesn't always send relation/type update + * messages after the streaming transaction, so also update the + * relation/type in leader apply worker. See function + * cleanup_rel_sync_cache. + */ + if (pa_send_data(winfo, s->len, s->data)) + return (action != LOGICAL_REP_MSG_RELATION && + action != LOGICAL_REP_MSG_TYPE); + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, false); - return true; + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(action, &original_msg); + + /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */ + return (action != LOGICAL_REP_MSG_RELATION && + action != LOGICAL_REP_MSG_TYPE); + + case TRANS_PARALLEL_APPLY: + parallel_stream_nchanges += 1; + + /* Define a savepoint for a subxact if needed. */ + pa_start_subtrans(current_xid, stream_xid); + return false; + + default: + Assert(false); + return false; /* silence compiler warning */ + } } /* @@ -928,8 +1091,11 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) * BeginTransactionBlock is necessary to balance the EndTransactionBlock * called within the PrepareTransactionBlock below. */ - BeginTransactionBlock(); - CommitTransactionCommand(); /* Completes the preceding Begin command. */ + if (!IsTransactionBlock()) + { + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ + } /* * Update origin state so we can restart streaming from correct position @@ -976,7 +1142,7 @@ apply_handle_prepare(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; @@ -998,6 +1164,12 @@ apply_handle_prepare(StringInfo s) /* * Handle a COMMIT PREPARED of a previously PREPARED transaction. + * + * Note that we don't need to wait here if the transaction was prepared in a + * parallel apply worker. In that case, we have already waited for the prepare + * to finish in apply_handle_stream_prepare() which will ensure all the + * operations in that transaction have happened in the subscriber, so no + * concurrent transaction can cause deadlock or transaction dependency issues. */ static void apply_handle_commit_prepared(StringInfo s) @@ -1027,7 +1199,7 @@ apply_handle_commit_prepared(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1041,6 +1213,12 @@ apply_handle_commit_prepared(StringInfo s) /* * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION. + * + * Note that we don't need to wait here if the transaction was prepared in a + * parallel apply worker. In that case, we have already waited for the prepare + * to finish in apply_handle_stream_prepare() which will ensure all the + * operations in that transaction have happened in the subscriber, so no + * concurrent transaction can cause deadlock or transaction dependency issues. */ static void apply_handle_rollback_prepared(StringInfo s) @@ -1082,7 +1260,7 @@ apply_handle_rollback_prepared(StringInfo s) pgstat_report_stat(false); - store_flush_position(rollback_data.rollback_end_lsn); + store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1094,15 +1272,16 @@ apply_handle_rollback_prepared(StringInfo s) /* * Handle STREAM PREPARE. - * - * Logic is in two parts: - * 1. Replay all the spooled operations - * 2. Mark the transaction as prepared */ static void apply_handle_stream_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; if (in_streamed_transaction) ereport(ERROR, @@ -1118,24 +1297,98 @@ apply_handle_stream_prepare(StringInfo s) logicalrep_read_stream_prepare(s, &prepare_data); set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn); - elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); + apply_action = get_transaction_apply_action(prepare_data.xid, &winfo); - /* Replay all the spooled operations. */ - apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); + switch (apply_action) + { + case TRANS_LEADER_SERIALIZE: - /* Mark the transaction as prepared. */ - apply_handle_prepare_internal(&prepare_data); + /* + * The transaction has been serialized to file, so replay all the + * spooled operations. + */ + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, + prepare_data.xid, prepare_data.prepare_lsn); - CommitTransactionCommand(); + /* Mark the transaction as prepared. */ + apply_handle_prepare_internal(&prepare_data); - pgstat_report_stat(false); + CommitTransactionCommand(); - store_flush_position(prepare_data.end_lsn); + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); - in_remote_transaction = false; + in_remote_transaction = false; + + /* Unlink the files with serialized changes and subxact info. */ + stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); + + elog(DEBUG1, "finished processing the STREAM PREPARE command"); + break; - /* unlink the files with serialized changes and subxact info. */ - stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + /* Finish processing the streaming transaction. */ + pa_xact_finish(winfo, prepare_data.end_lsn); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_open_and_write_change(prepare_data.xid, + LOGICAL_REP_MSG_STREAM_PREPARE, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* Finish processing the streaming transaction. */ + pa_xact_finish(winfo, prepare_data.end_lsn); + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before preparing. + */ + if (stream_fd) + stream_close_file(); + + begin_replication_step(); + + /* Mark the transaction as prepared. */ + apply_handle_prepare_internal(&prepare_data); + + end_replication_step(); + + CommitTransactionCommand(); + + MyParallelShared->last_commit_end = XactLastCommitEnd; + + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); + pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock); + + pa_reset_subtrans(); + + elog(DEBUG1, "finished processing the STREAM PREPARE command"); + break; + + default: + Assert(false); + break; + } + + pgstat_report_stat(false); /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1173,27 +1426,64 @@ apply_handle_origin(StringInfo s) } /* + * Initialize fileset (if not already done). + * + * Create a new file when first_segment is true, otherwise open the existing + * file. + */ +void +stream_start_internal(TransactionId xid, bool first_segment) +{ + begin_replication_step(); + + /* + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. + */ + if (!MyLogicalRepWorker->stream_fileset) + { + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); + } + + /* Open the spool file for this transaction. */ + stream_open_file(MyLogicalRepWorker->subid, xid, first_segment); + + /* If this is not the first segment, open existing subxact file. */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, xid); + + end_replication_step(); +} + +/* * Handle STREAM START message. */ static void apply_handle_stream_start(StringInfo s) { bool first_segment; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; if (in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); - /* - * Start a transaction on stream start, this transaction will be committed - * on the stream stop unless it is a tablesync worker in which case it - * will be committed after processing all the messages. We need the - * transaction for handling the buffile, used for serializing the - * streaming data and subxact info. - */ - begin_replication_step(); - /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; @@ -1207,54 +1497,119 @@ apply_handle_stream_start(StringInfo s) set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); - /* - * Initialize the worker's stream_fileset if we haven't yet. This will be - * used for the entire duration of the worker so create it in a permanent - * context. We create this on the very first streaming message from any - * transaction and then use it for this and other streaming transactions. - * Now, we could create a fileset at the start of the worker as well but - * then we won't be sure that it will ever be used. - */ - if (MyLogicalRepWorker->stream_fileset == NULL) + /* Try to allocate a worker for the streaming transaction. */ + if (first_segment) + pa_allocate_worker(stream_xid); + + apply_action = get_transaction_apply_action(stream_xid, &winfo); + + switch (apply_action) { - MemoryContext oldctx; + case TRANS_LEADER_SERIALIZE: - oldctx = MemoryContextSwitchTo(ApplyContext); + /* + * Function stream_start_internal starts a transaction. This + * transaction will be committed on the stream stop unless it is a + * tablesync worker in which case it will be committed after + * processing all the messages. We need this transaction for + * handling the BufFile, used for serializing the streaming data + * and subxact info. + */ + stream_start_internal(stream_xid, first_segment); + break; - MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); - FileSetInit(MyLogicalRepWorker->stream_fileset); + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); - MemoryContextSwitchTo(oldctx); - } + /* + * Once we start serializing the changes, the parallel apply + * worker will wait for the leader to release the stream lock + * until the end of the transaction. So, we don't need to release + * the lock or increment the stream count in that case. + */ + if (pa_send_data(winfo, s->len, s->data)) + { + /* + * Unlock the shared object lock so that the parallel apply + * worker can continue to receive changes. + */ + if (!first_segment) + pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); - /* open the spool file for this transaction */ - stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); + /* + * Increment the number of streaming blocks waiting to be + * processed by parallel apply worker. + */ + pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); - /* if this is not the first segment, open existing subxact file */ - if (!first_segment) - subxact_info_read(MyLogicalRepWorker->subid, stream_xid); + /* Cache the parallel apply worker for this transaction. */ + pa_set_stream_apply_worker(winfo); + break; + } - pgstat_report_activity(STATE_RUNNING, NULL); + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, !first_segment); - end_replication_step(); + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Open the spool file unless it was already opened when switching + * to serialize mode. The transaction started in + * stream_start_internal will be committed on the stream stop. + */ + if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL) + stream_start_internal(stream_xid, first_segment); + + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); + + /* Cache the parallel apply worker for this transaction. */ + pa_set_stream_apply_worker(winfo); + break; + + case TRANS_PARALLEL_APPLY: + if (first_segment) + { + /* Hold the lock until the end of the transaction. */ + pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock); + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED); + + /* + * Signal the leader apply worker, as it may be waiting for + * us. + */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + } + + parallel_stream_nchanges = 0; + break; + + default: + Assert(false); + break; + } + + pgstat_report_activity(STATE_RUNNING, NULL); } /* - * Handle STREAM STOP message. + * Update the information about subxacts and close the file. + * + * This function should be called when the stream_start_internal function has + * been called. */ -static void -apply_handle_stream_stop(StringInfo s) +void +stream_stop_internal(TransactionId xid) { - if (!in_streamed_transaction) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("STREAM STOP message without STREAM START"))); - /* - * Close the file with serialized changes, and serialize information about - * subxacts for the toplevel transaction. + * Serialize information about subxacts for the toplevel transaction, then + * close the stream messages spool file. */ - subxact_info_write(MyLogicalRepWorker->subid, stream_xid); + subxact_info_write(MyLogicalRepWorker->subid, xid); stream_close_file(); /* We must be in a valid transaction state */ @@ -1263,40 +1618,124 @@ apply_handle_stream_stop(StringInfo s) /* Commit the per-stream transaction */ CommitTransactionCommand(); - in_streamed_transaction = false; - /* Reset per-stream context */ MemoryContextReset(LogicalStreamingContext); - - pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); } /* - * Handle STREAM abort message. + * Handle STREAM STOP message. */ static void -apply_handle_stream_abort(StringInfo s) +apply_handle_stream_stop(StringInfo s) { - TransactionId xid; - TransactionId subxid; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; - if (in_streamed_transaction) + if (!in_streamed_transaction) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("STREAM ABORT message without STREAM STOP"))); + errmsg_internal("STREAM STOP message without STREAM START"))); + + apply_action = get_transaction_apply_action(stream_xid, &winfo); + + switch (apply_action) + { + case TRANS_LEADER_SERIALIZE: + stream_stop_internal(stream_xid); + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* + * Lock before sending the STREAM_STOP message so that the leader + * can hold the lock first and the parallel apply worker will wait + * for leader to release the lock. See Locking Considerations atop + * applyparallelworker.c. + */ + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); + + if (pa_send_data(winfo, s->len, s->data)) + { + pa_set_stream_apply_worker(NULL); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s); + stream_stop_internal(stream_xid); + pa_set_stream_apply_worker(NULL); + break; + + case TRANS_PARALLEL_APPLY: + elog(DEBUG1, "applied %u changes in the streaming chunk", + parallel_stream_nchanges); + + /* + * By the time parallel apply worker is processing the changes in + * the current streaming block, the leader apply worker may have + * sent multiple streaming blocks. This can lead to parallel apply + * worker start waiting even when there are more chunk of streams + * in the queue. So, try to lock only if there is no message left + * in the queue. See Locking Considerations atop + * applyparallelworker.c. + * + * Note that here we have a race condition where we can start + * waiting even when there are pending streaming chunks. This can + * happen if the leader sends another streaming block and acquires + * the stream lock again after the parallel apply worker checks + * that there is no pending streaming block and before it actually + * starts waiting on a lock. We can handle this case by not + * allowing the leader to increment the stream block count during + * the time parallel apply worker acquires the lock but it is not + * clear whether that is worth the complexity. + * + * Now, if this missed chunk contains rollback to savepoint, then + * there is a risk of deadlock which probably shouldn't happen + * after restart. + */ + pa_decr_and_wait_stream_block(); + break; + + default: + Assert(false); + break; + } + + in_streamed_transaction = false; - logicalrep_read_stream_abort(s, &xid, &subxid); + /* + * The parallel apply worker could be in a transaction in which case we + * need to report the state as STATE_IDLEINTRANSACTION. + */ + if (IsTransactionOrTransactionBlock()) + pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL); + else + pgstat_report_activity(STATE_IDLE, NULL); + reset_apply_error_context_info(); +} + +/* + * Helper function to handle STREAM ABORT message when the transaction was + * serialized to file. + */ +static void +stream_abort_internal(TransactionId xid, TransactionId subxid) +{ /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so * just delete the files with serialized info. */ if (xid == subxid) - { - set_apply_error_context_xact(xid, InvalidXLogRecPtr); stream_cleanup_files(MyLogicalRepWorker->subid, xid); - } else { /* @@ -1320,8 +1759,6 @@ apply_handle_stream_abort(StringInfo s) bool found = false; char path[MAXPGPATH]; - set_apply_error_context_xact(subxid, InvalidXLogRecPtr); - subidx = -1; begin_replication_step(); subxact_info_read(MyLogicalRepWorker->subid, xid); @@ -1346,7 +1783,6 @@ apply_handle_stream_abort(StringInfo s) cleanup_subxact_info(); end_replication_step(); CommitTransactionCommand(); - reset_apply_error_context_info(); return; } @@ -1369,24 +1805,215 @@ apply_handle_stream_abort(StringInfo s) end_replication_step(); CommitTransactionCommand(); } +} + +/* + * Handle STREAM ABORT message. + */ +static void +apply_handle_stream_abort(StringInfo s) +{ + TransactionId xid; + TransactionId subxid; + LogicalRepStreamAbortData abort_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + bool toplevel_xact; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM ABORT message without STREAM STOP"))); + + /* We receive abort information only when we can apply in parallel. */ + logicalrep_read_stream_abort(s, &abort_data, + MyLogicalRepWorker->parallel_apply); + + xid = abort_data.xid; + subxid = abort_data.subxid; + toplevel_xact = (xid == subxid); + + set_apply_error_context_xact(subxid, abort_data.abort_lsn); + + apply_action = get_transaction_apply_action(xid, &winfo); + + switch (apply_action) + { + case TRANS_LEADER_SERIALIZE: + + /* + * We are in the leader apply worker and the transaction has been + * serialized to file. + */ + stream_abort_internal(xid, subxid); + + elog(DEBUG1, "finished processing the STREAM ABORT command"); + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + /* + * For the case of aborting the subtransaction, we increment the + * number of streaming blocks and take the lock again before + * sending the STREAM_ABORT to ensure that the parallel apply + * worker will wait on the lock for the next set of changes after + * processing the STREAM_ABORT message if it is not already + * waiting for STREAM_STOP message. + * + * It is important to perform this locking before sending the + * STREAM_ABORT message so that the leader can hold the lock first + * and the parallel apply worker will wait for the leader to + * release the lock. This is the same as what we do in + * apply_handle_stream_stop. See Locking Considerations atop + * applyparallelworker.c. + */ + if (!toplevel_xact) + { + pa_unlock_stream(xid, AccessExclusiveLock); + pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1); + pa_lock_stream(xid, AccessExclusiveLock); + } + + if (pa_send_data(winfo, s->len, s->data)) + { + /* + * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to + * wait here for the parallel apply worker to finish as that + * is not required to maintain the commit order and won't have + * the risk of failures due to transaction dependencies and + * deadlocks. However, it is possible that before the parallel + * worker finishes and we clear the worker info, the xid + * wraparound happens on the upstream and a new transaction + * with the same xid can appear and that can lead to duplicate + * entries in ParallelApplyTxnHash. Yet another problem could + * be that we may have serialized the changes in partial + * serialize mode and the file containing xact changes may + * already exist, and after xid wraparound trying to create + * the file for the same xid can lead to an error. To avoid + * these problems, we decide to wait for the aborts to finish. + * + * Note, it is okay to not update the flush location position + * for aborts as in worst case that means such a transaction + * won't be sent again after restart. + */ + if (toplevel_xact) + pa_xact_finish(winfo, InvalidXLogRecPtr); + + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + /* + * Parallel apply worker might have applied some changes, so write + * the STREAM_ABORT message so that it can rollback the + * subtransaction if needed. + */ + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT, + &original_msg); + + if (toplevel_xact) + { + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + pa_xact_finish(winfo, InvalidXLogRecPtr); + } + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before aborting. + */ + if (toplevel_xact && stream_fd) + stream_close_file(); + + pa_stream_abort(&abort_data); + + /* + * We need to wait after processing rollback to savepoint for the + * next set of changes. + * + * We have a race condition here due to which we can start waiting + * here when there are more chunk of streams in the queue. See + * apply_handle_stream_stop. + */ + if (!toplevel_xact) + pa_decr_and_wait_stream_block(); + + elog(DEBUG1, "finished processing the STREAM ABORT command"); + break; + + default: + Assert(false); + break; + } reset_apply_error_context_info(); } /* - * Common spoolfile processing. + * Ensure that the passed location is fileset's end. */ static void -apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) +ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, + off_t offset) +{ + char path[MAXPGPATH]; + BufFile *fd; + int last_fileno; + off_t last_offset; + + Assert(!IsTransactionState()); + + begin_replication_step(); + + changes_filename(path, MyLogicalRepWorker->subid, xid); + + fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + + BufFileSeek(fd, 0, 0, SEEK_END); + BufFileTell(fd, &last_fileno, &last_offset); + + BufFileClose(fd); + + end_replication_step(); + + if (last_fileno != fileno || last_offset != offset) + elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"", + path); +} + +/* + * Common spoolfile processing. + */ +void +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn) { StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; MemoryContext oldcxt; - BufFile *fd; + ResourceOwner oldowner; + int fileno; + off_t offset; - maybe_start_skipping_changes(lsn); + if (!am_parallel_apply_worker()) + maybe_start_skipping_changes(lsn); /* Make sure we have an open transaction */ begin_replication_step(); @@ -1402,8 +2029,16 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + /* + * Make sure the file is owned by the toplevel transaction so that the + * file will not be accidentally closed when aborting a subtransaction. + */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + + CurrentResourceOwner = oldowner; buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -1434,7 +2069,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) CHECK_FOR_INTERRUPTS(); /* read length of the on-disk record */ - nbytes = BufFileRead(fd, &len, sizeof(len)); + nbytes = BufFileRead(stream_fd, &len, sizeof(len)); /* have we reached end of the file? */ if (nbytes == 0) @@ -1455,12 +2090,14 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) buffer = repalloc(buffer, len); /* and finally read the data into the buffer */ - if (BufFileRead(fd, buffer, len) != len) + if (BufFileRead(stream_fd, buffer, len) != len) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from streaming transaction's changes file \"%s\": %m", path))); + BufFileTell(stream_fd, &fileno, &offset); + /* copy the buffer to the stringinfo and call apply_dispatch */ resetStringInfo(&s2); appendBinaryStringInfo(&s2, buffer, len); @@ -1476,15 +2113,24 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) nchanges++; + /* + * It is possible the file has been closed because we have processed + * the transaction end message like stream_commit in which case that + * must be the last message. + */ + if (!stream_fd) + { + ensure_last_message(stream_fileset, xid, fileno, offset); + break; + } + if (nchanges % 1000 == 0) elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); } - BufFileClose(fd); - - pfree(buffer); - pfree(s2.data); + if (stream_fd) + stream_close_file(); elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); @@ -1500,6 +2146,11 @@ apply_handle_stream_commit(StringInfo s) { TransactionId xid; LogicalRepCommitData commit_data; + ParallelApplyWorkerInfo *winfo; + TransApplyAction apply_action; + + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; if (in_streamed_transaction) ereport(ERROR, @@ -1509,14 +2160,85 @@ apply_handle_stream_commit(StringInfo s) xid = logicalrep_read_stream_commit(s, &commit_data); set_apply_error_context_xact(xid, commit_data.commit_lsn); - elog(DEBUG1, "received commit for streamed transaction %u", xid); + apply_action = get_transaction_apply_action(xid, &winfo); - apply_spooled_messages(xid, commit_data.commit_lsn); + switch (apply_action) + { + case TRANS_LEADER_SERIALIZE: - apply_handle_commit_internal(&commit_data); + /* + * The transaction has been serialized to file, so replay all the + * spooled operations. + */ + apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, + commit_data.commit_lsn); + + apply_handle_commit_internal(&commit_data); + + /* Unlink the files with serialized changes and subxact info. */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + elog(DEBUG1, "finished processing the STREAM COMMIT command"); + break; + + case TRANS_LEADER_SEND_TO_PARALLEL: + Assert(winfo); + + if (pa_send_data(winfo, s->len, s->data)) + { + /* Finish processing the streaming transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + } + + /* + * Switch to serialize mode when we are not able to send the + * change to parallel apply worker. + */ + pa_switch_to_partial_serialize(winfo, true); + + /* fall through */ + case TRANS_LEADER_PARTIAL_SERIALIZE: + Assert(winfo); + + stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT, + &original_msg); + + pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE); + + /* Finish processing the streaming transaction. */ + pa_xact_finish(winfo, commit_data.end_lsn); + break; + + case TRANS_PARALLEL_APPLY: + + /* + * If the parallel apply worker is applying spooled messages then + * close the file before committing. + */ + if (stream_fd) + stream_close_file(); + + apply_handle_commit_internal(&commit_data); + + MyParallelShared->last_commit_end = XactLastCommitEnd; + + /* + * It is important to set the transaction state as finished before + * releasing the lock. See pa_wait_for_xact_finish. + */ + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); + pa_unlock_transaction(xid, AccessExclusiveLock); - /* unlink the files with serialized changes and subxact info */ - stream_cleanup_files(MyLogicalRepWorker->subid, xid); + pa_reset_subtrans(); + + elog(DEBUG1, "finished processing the STREAM COMMIT command"); + break; + + default: + Assert(false); + break; + } /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(commit_data.end_lsn); @@ -1560,9 +2282,16 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) replorigin_session_origin_timestamp = commit_data->committime; CommitTransactionCommand(); + + if (IsTransactionBlock()) + { + EndTransactionBlock(false); + CommitTransactionCommand(); + } + pgstat_report_stat(false); - store_flush_position(commit_data->end_lsn); + store_flush_position(commit_data->end_lsn, XactLastCommitEnd); } else { @@ -2508,7 +3237,7 @@ apply_handle_truncate(StringInfo s) /* * Logical replication protocol message dispatcher. */ -static void +void apply_dispatch(StringInfo s) { LogicalRepMsgType action = pq_getmsgbyte(s); @@ -2672,17 +3401,24 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, /* * Store current remote/local lsn pair in the tracking list. */ -static void -store_flush_position(XLogRecPtr remote_lsn) +void +store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) { FlushPosition *flushpos; + /* + * Skip for parallel apply workers, because the lsn_mapping is maintained + * by the leader apply worker. + */ + if (am_parallel_apply_worker()) + return; + /* Need to do this in permanent context */ MemoryContextSwitchTo(ApplyContext); /* Track commit lsn */ flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); - flushpos->local_end = XactLastCommitEnd; + flushpos->local_end = local_lsn; flushpos->remote_end = remote_lsn; dlist_push_tail(&lsn_mapping, &flushpos->node); @@ -2741,6 +3477,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) errcallback.callback = apply_error_callback; errcallback.previous = error_context_stack; error_context_stack = &errcallback; + apply_error_context_stack = error_context_stack; /* This outer loop iterates once per wait. */ for (;;) @@ -2955,6 +3692,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Pop the error context stack */ error_context_stack = errcallback.previous; + apply_error_context_stack = error_context_stack; /* All done */ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); @@ -3053,9 +3791,31 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) } /* - * Reread subscription info if needed. Most changes will be exit. + * Exit routine for apply workers due to subscription parameter changes. */ static void +apply_worker_exit(void) +{ + if (am_parallel_apply_worker()) + { + /* + * Don't stop the parallel apply worker as the leader will detect the + * subscription parameter change and restart logical replication later + * anyway. This also prevents the leader from reporting errors when + * trying to communicate with a stopped parallel apply worker, which + * would accidentally disable subscriptions if disable_on_error was + * set. + */ + return; + } + + proc_exit(0); +} + +/* + * Reread subscription info if needed. Most changes will be exit. + */ +void maybe_reread_subscription(void) { MemoryContext oldctx; @@ -3085,9 +3845,9 @@ maybe_reread_subscription(void) if (!newsub) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "stop because the subscription was removed", - MySubscription->name))); + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" will stop because the subscription was removed", + get_worker_name(), MySubscription->name))); proc_exit(0); } @@ -3096,11 +3856,11 @@ maybe_reread_subscription(void) if (!newsub->enabled) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "stop because the subscription was disabled", - MySubscription->name))); + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" will stop because the subscription was disabled", + get_worker_name(), MySubscription->name))); - proc_exit(0); + apply_worker_exit(); } /* !slotname should never happen when enabled is true. */ @@ -3111,7 +3871,10 @@ maybe_reread_subscription(void) /* * Exit if any parameter that affects the remote connection was changed. - * The launcher will start a new worker. + * The launcher will start a new worker but note that the parallel apply + * worker won't restart if the streaming option's value is changed from + * 'parallel' to any other value or the server decides not to stream the + * in-progress transaction. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || strcmp(newsub->name, MySubscription->name) != 0 || @@ -3122,11 +3885,17 @@ maybe_reread_subscription(void) newsub->owner != MySubscription->owner || !equal(newsub->publications, MySubscription->publications)) { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", - MySubscription->name))); + if (am_parallel_apply_worker()) + ereport(LOG, + (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change", + MySubscription->name))); + else + ereport(LOG, + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" will restart because of a parameter change", + get_worker_name(), MySubscription->name))); - proc_exit(0); + apply_worker_exit(); } /* Check for other changes that should never happen too. */ @@ -3378,7 +4147,7 @@ changes_filename(char *path, Oid subid, TransactionId xid) * toplevel transaction. Each subscription has a separate set of files * for any toplevel transaction. */ -static void +void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; @@ -3401,9 +4170,6 @@ stream_cleanup_files(Oid subid, TransactionId xid) * by stream_xid (global variable). If it's the first chunk of streamed * changes for this transaction, create the buffile, otherwise open the * previously created file. - * - * This can only be called at the beginning of a "streaming" block, i.e. - * between stream_start/stream_stop messages from the upstream. */ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) @@ -3411,7 +4177,6 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) char path[MAXPGPATH]; MemoryContext oldcxt; - Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); @@ -3450,15 +4215,10 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) /* * stream_close_file * Close the currently open file with streamed changes. - * - * This can only be called at the end of a streaming block, i.e. at stream_stop - * message from the upstream. */ static void stream_close_file(void) { - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); BufFileClose(stream_fd); @@ -3480,8 +4240,6 @@ stream_write_change(char action, StringInfo s) { int len; - Assert(in_streamed_transaction); - Assert(TransactionIdIsValid(stream_xid)); Assert(stream_fd != NULL); /* total on-disk size, including the action type character */ @@ -3500,6 +4258,26 @@ stream_write_change(char action, StringInfo s) } /* + * stream_open_and_write_change + * Serialize a message to a file for the given transaction. + * + * This function is similar to stream_write_change except that it will open the + * target file if not already before writing the message and close the file at + * the end. + */ +static void +stream_open_and_write_change(TransactionId xid, char action, StringInfo s) +{ + Assert(!in_streamed_transaction); + + if (!stream_fd) + stream_start_internal(xid, false); + + stream_write_change(action, s); + stream_stop_internal(xid); +} + +/* * Cleanup the memory for subxacts and reset the related variables. */ static inline void @@ -3610,37 +4388,16 @@ start_apply(XLogRecPtr origin_startpos) PG_END_TRY(); } -/* Logical Replication Apply worker entry point */ +/* + * Common initialization for leader apply worker and parallel apply worker. + * + * Initialize the database connection, in-memory subscription and necessary + * config options. + */ void -ApplyWorkerMain(Datum main_arg) +InitializeApplyWorker(void) { - int worker_slot = DatumGetInt32(main_arg); MemoryContext oldctx; - char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos = InvalidXLogRecPtr; - char *myslotname = NULL; - WalRcvStreamOptions options; - int server_version; - - /* Attach to slot */ - logicalrep_worker_attach(worker_slot); - - /* Setup signal handling */ - pqsignal(SIGHUP, SignalHandlerForConfigReload); - pqsignal(SIGTERM, die); - BackgroundWorkerUnblockSignals(); - - /* - * We don't currently need any ResourceOwner in a walreceiver process, but - * if we did, we could call CreateAuxProcessResourceOwner here. - */ - - /* Initialise stats to a sanish value */ - MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = - MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); - - /* Load the libpq-specific functions */ - load_file("libpqwalreceiver", false); /* Run as replica session replication role. */ SetConfigOption("session_replication_role", "replica", @@ -3668,9 +4425,10 @@ ApplyWorkerMain(Datum main_arg) if (!MySubscription) { ereport(LOG, - (errmsg("logical replication apply worker for subscription %u will not " - "start because the subscription was removed during startup", - MyLogicalRepWorker->subid))); + /* translator: %s is the name of logical replication worker */ + (errmsg("%s for subscription %u will not start because the subscription was removed during startup", + get_worker_name(), MyLogicalRepWorker->subid))); + proc_exit(0); } @@ -3680,11 +4438,11 @@ ApplyWorkerMain(Datum main_arg) if (!MySubscription->enabled) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will not " - "start because the subscription was disabled during startup", - MySubscription->name))); + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup", + get_worker_name(), MySubscription->name))); - proc_exit(0); + apply_worker_exit(); } /* Setup synchronous commit according to the user's wishes */ @@ -3699,13 +4457,49 @@ ApplyWorkerMain(Datum main_arg) if (am_tablesync_worker()) ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", - MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); else ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" has started", - MySubscription->name))); + /* translator: first %s is the name of logical replication worker */ + (errmsg("%s for subscription \"%s\" has started", + get_worker_name(), MySubscription->name))); CommitTransactionCommand(); +} + +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *myslotname = NULL; + WalRcvStreamOptions options; + int server_version; + + /* Attach to slot */ + logicalrep_worker_attach(worker_slot); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * We don't currently need any ResourceOwner in a walreceiver process, but + * if we did, we could call CreateAuxProcessResourceOwner here. + */ + + /* Initialise stats to a sanish value */ + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = + MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + InitializeApplyWorker(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", @@ -3715,20 +4509,15 @@ ApplyWorkerMain(Datum main_arg) { start_table_sync(&origin_startpos, &myslotname); - /* - * Allocate the origin name in long-lived context for error context - * message. - */ ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, originname, sizeof(originname)); - apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, - originname); + set_apply_error_context_origin(originname); } else { - /* This is main apply worker */ + /* This is the leader apply worker */ RepOriginId originid; TimeLineID startpointTLI; char *err; @@ -3752,7 +4541,7 @@ ApplyWorkerMain(Datum main_arg) originid = replorigin_by_name(originname, true); if (!OidIsValid(originid)) originid = replorigin_create(originname); - replorigin_session_setup(originid); + replorigin_session_setup(originid, 0); replorigin_session_origin = originid; origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); @@ -3770,12 +4559,7 @@ ApplyWorkerMain(Datum main_arg) */ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); - /* - * Allocate the origin name in long-lived context for error context - * message. - */ - apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, - originname); + set_apply_error_context_origin(originname); } /* @@ -3793,13 +4577,36 @@ ApplyWorkerMain(Datum main_arg) server_version = walrcv_server_version(LogRepWorkerWalRcvConn); options.proto.logical.proto_version = + server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; - options.proto.logical.streaming = MySubscription->stream; + + /* + * Assign the appropriate option value for streaming option according to + * the 'streaming' mode and the publisher's ability to support that mode. + */ + if (server_version >= 160000 && + MySubscription->stream == LOGICALREP_STREAM_PARALLEL) + { + options.proto.logical.streaming_str = "parallel"; + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != LOGICALREP_STREAM_OFF) + { + options.proto.logical.streaming_str = "on"; + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options.proto.logical.streaming_str = NULL; + MyLogicalRepWorker->parallel_apply = false; + } + options.proto.logical.twophase = false; options.proto.logical.origin = pstrdup(MySubscription->origin); @@ -3897,6 +4704,15 @@ IsLogicalWorker(void) } /* + * Is current process a logical replication parallel apply worker? + */ +bool +IsLogicalParallelApplyWorker(void) +{ + return IsLogicalWorker() && am_parallel_apply_worker(); +} + +/* * Start skipping changes of the transaction if the given LSN matches the * LSN specified by subscription's skiplsn. */ @@ -3958,7 +4774,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn) XLogRecPtr myskiplsn = MySubscription->skiplsn; bool started_tx = false; - if (likely(XLogRecPtrIsInvalid(myskiplsn))) + if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker()) return; if (!IsTransactionState()) @@ -4030,7 +4846,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn) } /* Error callback to give more context info about the change being applied */ -static void +void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; @@ -4058,23 +4874,47 @@ apply_error_callback(void *arg) errarg->remote_xid, LSN_FORMAT_ARGS(errarg->finish_lsn)); } - else if (errarg->remote_attnum < 0) - errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X", - errarg->origin_name, - logicalrep_message_type(errarg->command), - errarg->rel->remoterel.nspname, - errarg->rel->remoterel.relname, - errarg->remote_xid, - LSN_FORMAT_ARGS(errarg->finish_lsn)); else - errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X", - errarg->origin_name, - logicalrep_message_type(errarg->command), - errarg->rel->remoterel.nspname, - errarg->rel->remoterel.relname, - errarg->rel->remoterel.attnames[errarg->remote_attnum], - errarg->remote_xid, - LSN_FORMAT_ARGS(errarg->finish_lsn)); + { + if (errarg->remote_attnum < 0) + { + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) + errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u", + errarg->origin_name, + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->remote_xid); + else + errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X", + errarg->origin_name, + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->remote_xid, + LSN_FORMAT_ARGS(errarg->finish_lsn)); + } + else + { + if (XLogRecPtrIsInvalid(errarg->finish_lsn)) + errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u", + errarg->origin_name, + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->rel->remoterel.attnames[errarg->remote_attnum], + errarg->remote_xid); + else + errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X", + errarg->origin_name, + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->rel->remoterel.attnames[errarg->remote_attnum], + errarg->remote_xid, + LSN_FORMAT_ARGS(errarg->finish_lsn)); + } + } } /* Set transaction information of apply error callback */ @@ -4144,3 +4984,53 @@ AtEOXact_LogicalRepWorkers(bool isCommit) /* The List storage will be reclaimed automatically in xact cleanup. */ on_commit_wakeup_workers_subids = NIL; } + +/* + * Allocate the origin name in long-lived context for error context message. + */ +void +set_apply_error_context_origin(char *originname) +{ + apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext, + originname); +} + +/* + * Return the action to be taken for the given transaction. *winfo is + * assigned to the destination parallel worker info when the leader apply + * worker has to pass all the transaction's changes to the parallel apply + * worker. + */ +static TransApplyAction +get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) +{ + *winfo = NULL; + + if (am_parallel_apply_worker()) + { + return TRANS_PARALLEL_APPLY; + } + else if (in_remote_transaction) + { + return TRANS_LEADER_APPLY; + } + + /* + * Check if we are processing this transaction using a parallel apply + * worker. + */ + *winfo = pa_find_worker(xid); + + if (!*winfo) + { + return TRANS_LEADER_SERIALIZE; + } + else if ((*winfo)->serialize_changes) + { + return TRANS_LEADER_PARTIAL_SERIALIZE; + } + else + { + return TRANS_LEADER_SEND_TO_PARALLEL; + } +} diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 876adab38e5..19c10c028f9 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -18,6 +18,7 @@ #include "catalog/pg_publication_rel.h" #include "catalog/pg_subscription.h" #include "commands/defrem.h" +#include "commands/subscriptioncmds.h" #include "executor/executor.h" #include "fmgr.h" #include "nodes/makefuncs.h" @@ -290,7 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool origin_option_given = false; data->binary = false; - data->streaming = false; + data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; @@ -369,7 +370,7 @@ parse_output_parameters(List *options, PGOutputData *data) errmsg("conflicting or redundant options"))); streaming_given = true; - data->streaming = defGetBoolean(defel); + data->streaming = defGetStreamingMode(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -461,13 +462,20 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, * we only allow it with sufficient version of the protocol, and when * the output plugin supports it. */ - if (!data->streaming) + if (data->streaming == LOGICALREP_STREAM_OFF) ctx->streaming = false; - else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) + else if (data->streaming == LOGICALREP_STREAM_ON && + data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("requested proto_version=%d does not support streaming, need %d or higher", data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM))); + else if (data->streaming == LOGICALREP_STREAM_PARALLEL && + data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM))); else if (!ctx->streaming) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1841,6 +1849,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn) { ReorderBufferTXN *toptxn; + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL); /* * The abort should happen outside streaming block, even for streamed @@ -1854,7 +1864,9 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, Assert(rbtxn_is_streamed(toptxn)); OutputPluginPrepareWrite(ctx, true); - logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn, + txn->xact_time.abort_time, write_abort_info); + OutputPluginWrite(ctx, true); cleanup_rel_sync_cache(toptxn->xid, false); |