aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c6
-rw-r--r--src/backend/replication/logical/Makefile1
-rw-r--r--src/backend/replication/logical/applyparallelworker.c1630
-rw-r--r--src/backend/replication/logical/decode.c5
-rw-r--r--src/backend/replication/logical/launcher.c220
-rw-r--r--src/backend/replication/logical/meson.build1
-rw-r--r--src/backend/replication/logical/origin.c26
-rw-r--r--src/backend/replication/logical/proto.c37
-rw-r--r--src/backend/replication/logical/reorderbuffer.c5
-rw-r--r--src/backend/replication/logical/tablesync.c25
-rwxr-xr-x[-rw-r--r--]src/backend/replication/logical/worker.c1354
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c22
12 files changed, 3033 insertions, 299 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 219cd73b7fc..c40c6220db8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -443,9 +443,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version);
- if (options->proto.logical.streaming &&
- PQserverVersion(conn->streamConn) >= 140000)
- appendStringInfoString(&cmd, ", streaming 'on'");
+ if (options->proto.logical.streaming_str)
+ appendStringInfo(&cmd, ", streaming '%s'",
+ options->proto.logical.streaming_str);
if (options->proto.logical.twophase &&
PQserverVersion(conn->streamConn) >= 150000)
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c4e2fdeb719..2dc25e37bb8 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = \
+ applyparallelworker.o \
decode.o \
launcher.o \
logical.o \
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
new file mode 100644
index 00000000000..2e5914d5d95
--- /dev/null
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -0,0 +1,1630 @@
+/*-------------------------------------------------------------------------
+ * applyparallelworker.c
+ * Support routines for applying xact by parallel apply worker
+ *
+ * Copyright (c) 2023, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/applyparallelworker.c
+ *
+ * This file contains the code to launch, set up, and teardown a parallel apply
+ * worker which receives the changes from the leader worker and invokes routines
+ * to apply those on the subscriber database. Additionally, this file contains
+ * routines that are intended to support setting up, using, and tearing down a
+ * ParallelApplyWorkerInfo which is required so the leader worker and parallel
+ * apply workers can communicate with each other.
+ *
+ * The parallel apply workers are assigned (if available) as soon as xact's
+ * first stream is received for subscriptions that have set their 'streaming'
+ * option as parallel. The leader apply worker will send changes to this new
+ * worker via shared memory. We keep this worker assigned till the transaction
+ * commit is received and also wait for the worker to finish at commit. This
+ * preserves commit ordering and avoid file I/O in most cases, although we
+ * still need to spill to a file if there is no worker available. See comments
+ * atop logical/worker to know more about streamed xacts whose changes are
+ * spilled to disk. It is important to maintain commit order to avoid failures
+ * due to: (a) transaction dependencies - say if we insert a row in the first
+ * transaction and update it in the second transaction on publisher then
+ * allowing the subscriber to apply both in parallel can lead to failure in the
+ * update; (b) deadlocks - allowing transactions that update the same set of
+ * rows/tables in the opposite order to be applied in parallel can lead to
+ * deadlocks.
+ *
+ * A worker pool is used to avoid restarting workers for each streaming
+ * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
+ * in the ParallelApplyWorkerPool. After successfully launching a new worker,
+ * its information is added to the ParallelApplyWorkerPool. Once the worker
+ * finishes applying the transaction, it is marked as available for re-use.
+ * Now, before starting a new worker to apply the streaming transaction, we
+ * check the list for any available worker. Note that we retain a maximum of
+ * half the max_parallel_apply_workers_per_subscription workers in the pool and
+ * after that, we simply exit the worker after applying the transaction.
+ *
+ * XXX This worker pool threshold is arbitrary and we can provide a GUC
+ * variable for this in the future if required.
+ *
+ * The leader apply worker will create a separate dynamic shared memory segment
+ * when each parallel apply worker starts. The reason for this design is that
+ * we cannot predict how many workers will be needed. It may be possible to
+ * allocate enough shared memory in one segment based on the maximum number of
+ * parallel apply workers (max_parallel_apply_workers_per_subscription), but
+ * this would waste memory if no process is actually started.
+ *
+ * The dynamic shared memory segment contains: (a) a shm_mq that is used to
+ * send changes in the transaction from leader apply worker to parallel apply
+ * worker; (b) another shm_mq that is used to send errors (and other messages
+ * reported via elog/ereport) from the parallel apply worker to leader apply
+ * worker; (c) necessary information to be shared among parallel apply workers
+ * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
+ *
+ * Locking Considerations
+ * ----------------------
+ * We have a risk of deadlock due to concurrently applying the transactions in
+ * parallel mode that were independent on the publisher side but became
+ * dependent on the subscriber side due to the different database structures
+ * (like schema of subscription tables, constraints, etc.) on each side. This
+ * can happen even without parallel mode when there are concurrent operations
+ * on the subscriber. In order to detect the deadlocks among leader (LA) and
+ * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
+ * next stream (set of changes) and LA waits for PA to finish the transaction.
+ * An alternative approach could be to not allow parallelism when the schema of
+ * tables is different between the publisher and subscriber but that would be
+ * too restrictive and would require the publisher to send much more
+ * information than it is currently sending.
+ *
+ * Consider a case where the subscribed table does not have a unique key on the
+ * publisher and has a unique key on the subscriber. The deadlock can happen in
+ * the following ways:
+ *
+ * 1) Deadlock between the leader apply worker and a parallel apply worker
+ *
+ * Consider that the parallel apply worker (PA) is executing TX-1 and the
+ * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
+ * Now, LA is waiting for PA because of the unique key constraint of the
+ * subscribed table while PA is waiting for LA to send the next stream of
+ * changes or transaction finish command message.
+ *
+ * In order for lmgr to detect this, we have LA acquire a session lock on the
+ * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
+ * trying to receive the next stream of changes. Specifically, LA will acquire
+ * the lock in AccessExclusive mode before sending the STREAM_STOP and will
+ * release it if already acquired after sending the STREAM_START, STREAM_ABORT
+ * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
+ * acquire the lock in AccessShare mode after processing STREAM_STOP and
+ * STREAM_ABORT (for subtransaction) and then release the lock immediately
+ * after acquiring it.
+ *
+ * The lock graph for the above example will look as follows:
+ * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
+ * acquire the stream lock) -> LA
+ *
+ * This way, when PA is waiting for LA for the next stream of changes, we can
+ * have a wait-edge from PA to LA in lmgr, which will make us detect the
+ * deadlock between LA and PA.
+ *
+ * 2) Deadlock between the leader apply worker and parallel apply workers
+ *
+ * This scenario is similar to the first case but TX-1 and TX-2 are executed by
+ * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
+ * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
+ * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
+ * transaction in order to preserve the commit order. There is a deadlock among
+ * the three processes.
+ *
+ * In order for lmgr to detect this, we have PA acquire a session lock (this is
+ * a different lock than referred in the previous case, see
+ * pa_lock_transaction()) on the transaction being applied and have LA wait on
+ * the lock before proceeding in the transaction finish commands. Specifically,
+ * PA will acquire this lock in AccessExclusive mode before executing the first
+ * message of the transaction and release it at the xact end. LA will acquire
+ * this lock in AccessShare mode at transaction finish commands and release it
+ * immediately.
+ *
+ * The lock graph for the above example will look as follows:
+ * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
+ * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
+ * lock) -> LA
+ *
+ * This way when LA is waiting to finish the transaction end command to preserve
+ * the commit order, we will be able to detect deadlock, if any.
+ *
+ * One might think we can use XactLockTableWait(), but XactLockTableWait()
+ * considers PREPARED TRANSACTION as still in progress which means the lock
+ * won't be released even after the parallel apply worker has prepared the
+ * transaction.
+ *
+ * 3) Deadlock when the shm_mq buffer is full
+ *
+ * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
+ * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
+ * wait to send messages, and this wait doesn't appear in lmgr.
+ *
+ * To avoid this wait, we use a non-blocking write and wait with a timeout. If
+ * the timeout is exceeded, the LA will serialize all the pending messages to
+ * a file and indicate PA-2 that it needs to read that file for the remaining
+ * messages. Then LA will start waiting for commit as in the previous case
+ * which will detect deadlock if any. See pa_send_data() and
+ * enum TransApplyAction.
+ *
+ * Lock types
+ * ----------
+ * Both the stream lock and the transaction lock mentioned above are
+ * session-level locks because both locks could be acquired outside the
+ * transaction, and the stream lock in the leader needs to persist across
+ * transaction boundaries i.e. until the end of the streaming transaction.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "pgstat.h"
+#include "postmaster/interrupt.h"
+#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
+#include "replication/origin.h"
+#include "replication/worker_internal.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/inval.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
+
+/*
+ * DSM keys for parallel apply worker. Unlike other parallel execution code,
+ * since we don't need to worry about DSM keys conflicting with plan_node_id we
+ * can use small integers.
+ */
+#define PARALLEL_APPLY_KEY_SHARED 1
+#define PARALLEL_APPLY_KEY_MQ 2
+#define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
+
+/* Queue size of DSM, 16 MB for now. */
+#define DSM_QUEUE_SIZE (16 * 1024 * 1024)
+
+/*
+ * Error queue size of DSM. It is desirable to make it large enough that a
+ * typical ErrorResponse can be sent without blocking. That way, a worker that
+ * errors out can write the whole message into the queue and terminate without
+ * waiting for the user backend.
+ */
+#define DSM_ERROR_QUEUE_SIZE (16 * 1024)
+
+/*
+ * There are three fields in each message received by the parallel apply
+ * worker: start_lsn, end_lsn and send_time. Because we have updated these
+ * statistics in the leader apply worker, we can ignore these fields in the
+ * parallel apply worker (see function LogicalRepApplyLoop).
+ */
+#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
+
+/*
+ * The type of session-level lock on a transaction being applied on a logical
+ * replication subscriber.
+ */
+#define PARALLEL_APPLY_LOCK_STREAM 0
+#define PARALLEL_APPLY_LOCK_XACT 1
+
+/*
+ * Hash table entry to map xid to the parallel apply worker state.
+ */
+typedef struct ParallelApplyWorkerEntry
+{
+ TransactionId xid; /* Hash key -- must be first */
+ ParallelApplyWorkerInfo *winfo;
+} ParallelApplyWorkerEntry;
+
+/*
+ * A hash table used to cache the state of streaming transactions being applied
+ * by the parallel apply workers.
+ */
+static HTAB *ParallelApplyTxnHash = NULL;
+
+/*
+* A list (pool) of active parallel apply workers. The information for
+* the new worker is added to the list after successfully launching it. The
+* list entry is removed if there are already enough workers in the worker
+* pool at the end of the transaction. For more information about the worker
+* pool, see comments atop this file.
+ */
+static List *ParallelApplyWorkerPool = NIL;
+
+/*
+ * Information shared between leader apply worker and parallel apply worker.
+ */
+ParallelApplyWorkerShared *MyParallelShared = NULL;
+
+/*
+ * Is there a message sent by a parallel apply worker that the leader apply
+ * worker needs to receive?
+ */
+volatile sig_atomic_t ParallelApplyMessagePending = false;
+
+/*
+ * Cache the parallel apply worker information required for applying the
+ * current streaming transaction. It is used to save the cost of searching the
+ * hash table when applying the changes between STREAM_START and STREAM_STOP.
+ */
+static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
+
+/* A list to maintain subtransactions, if any. */
+static List *subxactlist = NIL;
+
+static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
+static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
+static PartialFileSetState pa_get_fileset_state(void);
+
+/*
+ * Returns true if it is OK to start a parallel apply worker, false otherwise.
+ */
+static bool
+pa_can_start(void)
+{
+ /* Only leader apply workers can start parallel apply workers. */
+ if (!am_leader_apply_worker())
+ return false;
+
+ /*
+ * It is good to check for any change in the subscription parameter to
+ * avoid the case where for a very long time the change doesn't get
+ * reflected. This can happen when there is a constant flow of streaming
+ * transactions that are handled by parallel apply workers.
+ *
+ * It is better to do it before the below checks so that the latest values
+ * of subscription can be used for the checks.
+ */
+ maybe_reread_subscription();
+
+ /*
+ * Don't start a new parallel apply worker if the subscription is not
+ * using parallel streaming mode, or if the publisher does not support
+ * parallel apply.
+ */
+ if (!MyLogicalRepWorker->parallel_apply)
+ return false;
+
+ /*
+ * Don't start a new parallel worker if user has set skiplsn as it's
+ * possible that they want to skip the streaming transaction. For
+ * streaming transactions, we need to serialize the transaction to a file
+ * so that we can get the last LSN of the transaction to judge whether to
+ * skip before starting to apply the change.
+ *
+ * One might think that we could allow parallelism if the first lsn of the
+ * transaction is greater than skiplsn, but we don't send it with the
+ * STREAM START message, and it doesn't seem worth sending the extra eight
+ * bytes with the STREAM START to enable parallelism for this case.
+ */
+ if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
+ return false;
+
+ /*
+ * For streaming transactions that are being applied using a parallel
+ * apply worker, we cannot decide whether to apply the change for a
+ * relation that is not in the READY state (see
+ * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
+ * time. So, we don't start the new parallel apply worker in this case.
+ */
+ if (!AllTablesyncsReady())
+ return false;
+
+ return true;
+}
+
+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a fixed-size worker info
+ * (ParallelApplyWorkerShared), a message queue, and an error queue.
+ *
+ * Returns true on success, false on failure.
+ */
+static bool
+pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
+{
+ shm_toc_estimator e;
+ Size segsize;
+ dsm_segment *seg;
+ shm_toc *toc;
+ ParallelApplyWorkerShared *shared;
+ shm_mq *mq;
+ Size queue_size = DSM_QUEUE_SIZE;
+ Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
+
+ /*
+ * Estimate how much shared memory we need.
+ *
+ * Because the TOC machinery may choose to insert padding of oddly-sized
+ * requests, we must estimate each chunk separately.
+ *
+ * We need one key to register the location of the header, and two other
+ * keys to track the locations of the message queue and the error message
+ * queue.
+ */
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
+ shm_toc_estimate_chunk(&e, queue_size);
+ shm_toc_estimate_chunk(&e, error_queue_size);
+
+ shm_toc_estimate_keys(&e, 3);
+ segsize = shm_toc_estimate(&e);
+
+ /* Create the shared memory segment and establish a table of contents. */
+ seg = dsm_create(shm_toc_estimate(&e), 0);
+ if (!seg)
+ return false;
+
+ toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
+ segsize);
+
+ /* Set up the header region. */
+ shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
+ SpinLockInit(&shared->mutex);
+
+ shared->xact_state = PARALLEL_TRANS_UNKNOWN;
+ pg_atomic_init_u32(&(shared->pending_stream_count), 0);
+ shared->last_commit_end = InvalidXLogRecPtr;
+ shared->fileset_state = FS_EMPTY;
+
+ shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
+
+ /* Set up message queue for the worker. */
+ mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
+ shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
+ shm_mq_set_sender(mq, MyProc);
+
+ /* Attach the queue. */
+ winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
+
+ /* Set up error queue for the worker. */
+ mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
+ error_queue_size);
+ shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
+ shm_mq_set_receiver(mq, MyProc);
+
+ /* Attach the queue. */
+ winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
+
+ /* Return results to caller. */
+ winfo->dsm_seg = seg;
+ winfo->shared = shared;
+
+ return true;
+}
+
+/*
+ * Try to get a parallel apply worker from the pool. If none is available then
+ * start a new one.
+ */
+static ParallelApplyWorkerInfo *
+pa_launch_parallel_worker(void)
+{
+ MemoryContext oldcontext;
+ bool launched;
+ ParallelApplyWorkerInfo *winfo;
+ ListCell *lc;
+
+ /* Try to get an available parallel apply worker from the worker pool. */
+ foreach(lc, ParallelApplyWorkerPool)
+ {
+ winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+ if (!winfo->in_use)
+ return winfo;
+ }
+
+ /*
+ * Start a new parallel apply worker.
+ *
+ * The worker info can be used for the lifetime of the worker process, so
+ * create it in a permanent context.
+ */
+ oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+ winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
+
+ /* Setup shared memory. */
+ if (!pa_setup_dsm(winfo))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ pfree(winfo);
+ return NULL;
+ }
+
+ launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ InvalidOid,
+ dsm_segment_handle(winfo->dsm_seg));
+
+ if (launched)
+ {
+ ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
+ }
+ else
+ {
+ pa_free_worker_info(winfo);
+ winfo = NULL;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return winfo;
+}
+
+/*
+ * Allocate a parallel apply worker that will be used for the specified xid.
+ *
+ * We first try to get an available worker from the pool, if any and then try
+ * to launch a new worker. On successful allocation, remember the worker
+ * information in the hash table so that we can get it later for processing the
+ * streaming changes.
+ */
+void
+pa_allocate_worker(TransactionId xid)
+{
+ bool found;
+ ParallelApplyWorkerInfo *winfo = NULL;
+ ParallelApplyWorkerEntry *entry;
+
+ if (!pa_can_start())
+ return;
+
+ /* First time through, initialize parallel apply worker state hashtable. */
+ if (!ParallelApplyTxnHash)
+ {
+ HASHCTL ctl;
+
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(TransactionId);
+ ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
+ ctl.hcxt = ApplyContext;
+
+ ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
+ 16, &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ }
+
+ winfo = pa_launch_parallel_worker();
+ if (!winfo)
+ return;
+
+ /* Create an entry for the requested transaction. */
+ entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
+ if (found)
+ elog(ERROR, "hash table corrupted");
+
+ /* Update the transaction information in shared memory. */
+ SpinLockAcquire(&winfo->shared->mutex);
+ winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
+ winfo->shared->xid = xid;
+ SpinLockRelease(&winfo->shared->mutex);
+
+ winfo->in_use = true;
+ winfo->serialize_changes = false;
+ entry->winfo = winfo;
+ entry->xid = xid;
+}
+
+/*
+ * Find the assigned worker for the given transaction, if any.
+ */
+ParallelApplyWorkerInfo *
+pa_find_worker(TransactionId xid)
+{
+ bool found;
+ ParallelApplyWorkerEntry *entry;
+
+ if (!TransactionIdIsValid(xid))
+ return NULL;
+
+ if (!ParallelApplyTxnHash)
+ return NULL;
+
+ /* Return the cached parallel apply worker if valid. */
+ if (stream_apply_worker)
+ return stream_apply_worker;
+
+ /* Find an entry for the requested transaction. */
+ entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+ if (found)
+ {
+ /* The worker must not have exited. */
+ Assert(entry->winfo->in_use);
+ return entry->winfo;
+ }
+
+ return NULL;
+}
+
+/*
+ * Makes the worker available for reuse.
+ *
+ * This removes the parallel apply worker entry from the hash table so that it
+ * can't be used. If there are enough workers in the pool, it stops the worker
+ * and frees the corresponding info. Otherwise it just marks the worker as
+ * available for reuse.
+ *
+ * For more information about the worker pool, see comments atop this file.
+ */
+static void
+pa_free_worker(ParallelApplyWorkerInfo *winfo)
+{
+ Assert(!am_parallel_apply_worker());
+ Assert(winfo->in_use);
+ Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
+
+ if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
+ elog(ERROR, "hash table corrupted");
+
+ /*
+ * Stop the worker if there are enough workers in the pool.
+ *
+ * XXX Additionally, we also stop the worker if the leader apply worker
+ * serialize part of the transaction data due to a send timeout. This is
+ * because the message could be partially written to the queue and there
+ * is no way to clean the queue other than resending the message until it
+ * succeeds. Instead of trying to send the data which anyway would have
+ * been serialized and then letting the parallel apply worker deal with
+ * the spurious message, we stop the worker.
+ */
+ if (winfo->serialize_changes ||
+ list_length(ParallelApplyWorkerPool) >
+ (max_parallel_apply_workers_per_subscription / 2))
+ {
+ int slot_no;
+ uint16 generation;
+
+ SpinLockAcquire(&winfo->shared->mutex);
+ generation = winfo->shared->logicalrep_worker_generation;
+ slot_no = winfo->shared->logicalrep_worker_slot_no;
+ SpinLockRelease(&winfo->shared->mutex);
+
+ logicalrep_pa_worker_stop(slot_no, generation);
+
+ pa_free_worker_info(winfo);
+
+ return;
+ }
+
+ winfo->in_use = false;
+ winfo->serialize_changes = false;
+}
+
+/*
+ * Free the parallel apply worker information and unlink the files with
+ * serialized changes if any.
+ */
+static void
+pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
+{
+ Assert(winfo);
+
+ if (winfo->mq_handle)
+ shm_mq_detach(winfo->mq_handle);
+
+ if (winfo->error_mq_handle)
+ shm_mq_detach(winfo->error_mq_handle);
+
+ /* Unlink the files with serialized changes. */
+ if (winfo->serialize_changes)
+ stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
+
+ if (winfo->dsm_seg)
+ dsm_detach(winfo->dsm_seg);
+
+ /* Remove from the worker pool. */
+ ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
+
+ pfree(winfo);
+}
+
+/*
+ * Detach the error queue for all parallel apply workers.
+ */
+void
+pa_detach_all_error_mq(void)
+{
+ ListCell *lc;
+
+ foreach(lc, ParallelApplyWorkerPool)
+ {
+ ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ }
+}
+
+/*
+ * Check if there are any pending spooled messages.
+ */
+static bool
+pa_has_spooled_message_pending()
+{
+ PartialFileSetState fileset_state;
+
+ fileset_state = pa_get_fileset_state();
+
+ return (fileset_state != FS_EMPTY);
+}
+
+/*
+ * Replay the spooled messages once the leader apply worker has finished
+ * serializing changes to the file.
+ *
+ * Returns false if there aren't any pending spooled messages, true otherwise.
+ */
+static bool
+pa_process_spooled_messages_if_required(void)
+{
+ PartialFileSetState fileset_state;
+
+ fileset_state = pa_get_fileset_state();
+
+ if (fileset_state == FS_EMPTY)
+ return false;
+
+ /*
+ * If the leader apply worker is busy serializing the partial changes then
+ * acquire the stream lock now and wait for the leader worker to finish
+ * serializing the changes. Otherwise, the parallel apply worker won't get
+ * a chance to receive a STREAM_STOP (and acquire the stream lock) until
+ * the leader had serialized all changes which can lead to undetected
+ * deadlock.
+ *
+ * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
+ * worker has finished serializing the changes.
+ */
+ if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
+ {
+ pa_lock_stream(MyParallelShared->xid, AccessShareLock);
+ pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
+
+ fileset_state = pa_get_fileset_state();
+ }
+
+ /*
+ * We cannot read the file immediately after the leader has serialized all
+ * changes to the file because there may still be messages in the memory
+ * queue. We will apply all spooled messages the next time we call this
+ * function and that will ensure there are no messages left in the memory
+ * queue.
+ */
+ if (fileset_state == FS_SERIALIZE_DONE)
+ {
+ pa_set_fileset_state(MyParallelShared, FS_READY);
+ }
+ else if (fileset_state == FS_READY)
+ {
+ apply_spooled_messages(&MyParallelShared->fileset,
+ MyParallelShared->xid,
+ InvalidXLogRecPtr);
+ pa_set_fileset_state(MyParallelShared, FS_EMPTY);
+ }
+
+ return true;
+}
+
+/*
+ * Interrupt handler for main loop of parallel apply worker.
+ */
+static void
+ProcessParallelApplyInterrupts(void)
+{
+ CHECK_FOR_INTERRUPTS();
+
+ if (ShutdownRequestPending)
+ {
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+}
+
+/* Parallel apply worker main loop. */
+static void
+LogicalParallelApplyLoop(shm_mq_handle *mqh)
+{
+ shm_mq_result shmq_res;
+ ErrorContextCallback errcallback;
+ MemoryContext oldcxt = CurrentMemoryContext;
+
+ /*
+ * Init the ApplyMessageContext which we clean up after each replication
+ * protocol message.
+ */
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * Push apply error context callback. Fields will be filled while applying
+ * a change.
+ */
+ errcallback.callback = apply_error_callback;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ for (;;)
+ {
+ void *data;
+ Size len;
+
+ ProcessParallelApplyInterrupts();
+
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
+
+ shmq_res = shm_mq_receive(mqh, &len, &data, true);
+
+ if (shmq_res == SHM_MQ_SUCCESS)
+ {
+ StringInfoData s;
+ int c;
+
+ if (len == 0)
+ elog(ERROR, "invalid message length");
+
+ s.cursor = 0;
+ s.maxlen = -1;
+ s.data = (char *) data;
+ s.len = len;
+
+ /*
+ * The first byte of messages sent from leader apply worker to
+ * parallel apply workers can only be 'w'.
+ */
+ c = pq_getmsgbyte(&s);
+ if (c != 'w')
+ elog(ERROR, "unexpected message \"%c\"", c);
+
+ /*
+ * Ignore statistics fields that have been updated by the leader
+ * apply worker.
+ *
+ * XXX We can avoid sending the statistics fields from the leader
+ * apply worker but for that, it needs to rebuild the entire
+ * message by removing these fields which could be more work than
+ * simply ignoring these fields in the parallel apply worker.
+ */
+ s.cursor += SIZE_STATS_MESSAGE;
+
+ apply_dispatch(&s);
+ }
+ else if (shmq_res == SHM_MQ_WOULD_BLOCK)
+ {
+ /* Replay the changes from the file, if any. */
+ if (!pa_process_spooled_messages_if_required())
+ {
+ int rc;
+
+ /* Wait for more work. */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 1000L,
+ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+ }
+ else
+ {
+ Assert(shmq_res == SHM_MQ_DETACHED);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the logical replication apply worker")));
+ }
+
+ MemoryContextReset(ApplyMessageContext);
+ MemoryContextSwitchTo(oldcxt);
+ }
+
+ /* Pop the error context stack. */
+ error_context_stack = errcallback.previous;
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Make sure the leader apply worker tries to read from our error queue one more
+ * time. This guards against the case where we exit uncleanly without sending
+ * an ErrorResponse, for example because some code calls proc_exit directly.
+ */
+static void
+pa_shutdown(int code, Datum arg)
+{
+ SendProcSignal(MyLogicalRepWorker->apply_leader_pid,
+ PROCSIG_PARALLEL_APPLY_MESSAGE,
+ InvalidBackendId);
+
+ dsm_detach((dsm_segment *) DatumGetPointer(arg));
+}
+
+/*
+ * Parallel apply worker entry point.
+ */
+void
+ParallelApplyWorkerMain(Datum main_arg)
+{
+ ParallelApplyWorkerShared *shared;
+ dsm_handle handle;
+ dsm_segment *seg;
+ shm_toc *toc;
+ shm_mq *mq;
+ shm_mq_handle *mqh;
+ shm_mq_handle *error_mqh;
+ RepOriginId originid;
+ int worker_slot = DatumGetInt32(main_arg);
+ char originname[NAMEDATALEN];
+
+ /* Setup signal handling. */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ BackgroundWorkerUnblockSignals();
+
+ /*
+ * Attach to the dynamic shared memory segment for the parallel apply, and
+ * find its table of contents.
+ *
+ * Like parallel query, we don't need resource owner by this time. See
+ * ParallelWorkerMain.
+ */
+ memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
+ seg = dsm_attach(handle);
+ if (!seg)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to map dynamic shared memory segment")));
+
+ toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
+ if (!toc)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bad magic number in dynamic shared memory segment")));
+
+ before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
+
+ /* Look up the shared information. */
+ shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
+ MyParallelShared = shared;
+
+ /*
+ * Attach to the message queue.
+ */
+ mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
+ shm_mq_set_receiver(mq, MyProc);
+ mqh = shm_mq_attach(mq, seg, NULL);
+
+ /*
+ * Primary initialization is complete. Now, we can attach to our slot.
+ * This is to ensure that the leader apply worker does not write data to
+ * the uninitialized memory queue.
+ */
+ logicalrep_worker_attach(worker_slot);
+
+ SpinLockAcquire(&MyParallelShared->mutex);
+ MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
+ MyParallelShared->logicalrep_worker_slot_no = worker_slot;
+ SpinLockRelease(&MyParallelShared->mutex);
+
+ /*
+ * Attach to the error queue.
+ */
+ mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
+ shm_mq_set_sender(mq, MyProc);
+ error_mqh = shm_mq_attach(mq, seg, NULL);
+
+ pq_redirect_to_shm_mq(seg, error_mqh);
+ pq_set_parallel_leader(MyLogicalRepWorker->apply_leader_pid,
+ InvalidBackendId);
+
+ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+ MyLogicalRepWorker->reply_time = 0;
+
+ InitializeApplyWorker();
+
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ originname, sizeof(originname));
+ originid = replorigin_by_name(originname, false);
+
+ /*
+ * The parallel apply worker doesn't need to monopolize this replication
+ * origin which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid, MyLogicalRepWorker->apply_leader_pid);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();
+
+ /*
+ * Setup callback for syscache so that we know when something changes in
+ * the subscription relation state.
+ */
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+ invalidate_syncing_table_states,
+ (Datum) 0);
+
+ set_apply_error_context_origin(originname);
+
+ LogicalParallelApplyLoop(mqh);
+
+ /*
+ * The parallel apply worker must not get here because the parallel apply
+ * worker will only stop when it receives a SIGTERM or SIGINT from the
+ * leader, or when there is an error. None of these cases will allow the
+ * code to reach here.
+ */
+ Assert(false);
+}
+
+/*
+ * Handle receipt of an interrupt indicating a parallel apply worker message.
+ *
+ * Note: this is called within a signal handler! All we can do is set a flag
+ * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
+ * HandleParallelApplyMessages().
+ */
+void
+HandleParallelApplyMessageInterrupt(void)
+{
+ InterruptPending = true;
+ ParallelApplyMessagePending = true;
+ SetLatch(MyLatch);
+}
+
+/*
+ * Handle a single protocol message received from a single parallel apply
+ * worker.
+ */
+static void
+HandleParallelApplyMessage(StringInfo msg)
+{
+ char msgtype;
+
+ msgtype = pq_getmsgbyte(msg);
+
+ switch (msgtype)
+ {
+ case 'E': /* ErrorResponse */
+ {
+ ErrorData edata;
+
+ /* Parse ErrorResponse. */
+ pq_parse_errornotice(msg, &edata);
+
+ /*
+ * If desired, add a context line to show that this is a
+ * message propagated from a parallel apply worker. Otherwise,
+ * it can sometimes be confusing to understand what actually
+ * happened.
+ */
+ if (edata.context)
+ edata.context = psprintf("%s\n%s", edata.context,
+ _("logical replication parallel apply worker"));
+ else
+ edata.context = pstrdup(_("logical replication parallel apply worker"));
+
+ /*
+ * Context beyond that should use the error context callbacks
+ * that were in effect in LogicalRepApplyLoop().
+ */
+ error_context_stack = apply_error_context_stack;
+
+ /*
+ * The actual error must have been reported by the parallel
+ * apply worker.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker exited due to error"),
+ errcontext("%s", edata.context)));
+ }
+
+ /*
+ * Don't need to do anything about NoticeResponse and
+ * NotifyResponse as the logical replication worker doesn't need
+ * to send messages to the client.
+ */
+ case 'N':
+ case 'A':
+ break;
+
+ default:
+ elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
+ msgtype, msg->len);
+ }
+}
+
+/*
+ * Handle any queued protocol messages received from parallel apply workers.
+ */
+void
+HandleParallelApplyMessages(void)
+{
+ ListCell *lc;
+ MemoryContext oldcontext;
+
+ static MemoryContext hpam_context = NULL;
+
+ /*
+ * This is invoked from ProcessInterrupts(), and since some of the
+ * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
+ * for recursive calls if more signals are received while this runs. It's
+ * unclear that recursive entry would be safe, and it doesn't seem useful
+ * even if it is safe, so let's block interrupts until done.
+ */
+ HOLD_INTERRUPTS();
+
+ /*
+ * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
+ * don't want to risk leaking data into long-lived contexts, so let's do
+ * our work here in a private context that we can reset on each use.
+ */
+ if (!hpam_context) /* first time through? */
+ hpam_context = AllocSetContextCreate(TopMemoryContext,
+ "HandleParallelApplyMessages",
+ ALLOCSET_DEFAULT_SIZES);
+ else
+ MemoryContextReset(hpam_context);
+
+ oldcontext = MemoryContextSwitchTo(hpam_context);
+
+ ParallelApplyMessagePending = false;
+
+ foreach(lc, ParallelApplyWorkerPool)
+ {
+ shm_mq_result res;
+ Size nbytes;
+ void *data;
+ ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
+
+ /*
+ * The leader will detach from the error queue and set it to NULL
+ * before preparing to stop all parallel apply workers, so we don't
+ * need to handle error messages anymore. See
+ * logicalrep_worker_detach.
+ */
+ if (!winfo->error_mq_handle)
+ continue;
+
+ res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
+
+ if (res == SHM_MQ_WOULD_BLOCK)
+ continue;
+ else if (res == SHM_MQ_SUCCESS)
+ {
+ StringInfoData msg;
+
+ initStringInfo(&msg);
+ appendBinaryStringInfo(&msg, data, nbytes);
+ HandleParallelApplyMessage(&msg);
+ pfree(msg.data);
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the logical replication parallel apply worker")));
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Might as well clear the context on our way out */
+ MemoryContextReset(hpam_context);
+
+ RESUME_INTERRUPTS();
+}
+
+/*
+ * Send the data to the specified parallel apply worker via shared-memory
+ * queue.
+ *
+ * Returns false if the attempt to send data via shared memory times out, true
+ * otherwise.
+ */
+bool
+pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
+{
+ int rc;
+ shm_mq_result result;
+ TimestampTz startTime = 0;
+
+ Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);
+
+/*
+ * This timeout is a bit arbitrary but testing revealed that it is sufficient
+ * to send the message unless the parallel apply worker is waiting on some
+ * lock or there is a serious resource crunch. See the comments atop this file
+ * to know why we are using a non-blocking way to send the message.
+ */
+#define SHM_SEND_RETRY_INTERVAL_MS 1000
+#define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
+
+ for (;;)
+ {
+ result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
+
+ if (result == SHM_MQ_SUCCESS)
+ return true;
+ else if (result == SHM_MQ_DETACHED)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not send data to shared-memory queue")));
+
+ Assert(result == SHM_MQ_WOULD_BLOCK);
+
+ /* Wait before retrying. */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ SHM_SEND_RETRY_INTERVAL_MS,
+ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ if (startTime == 0)
+ startTime = GetCurrentTimestamp();
+ else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
+ SHM_SEND_TIMEOUT_MS))
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
+ winfo->shared->xid)));
+ return false;
+ }
+ }
+}
+
+/*
+ * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
+ * that the current data and any subsequent data for this transaction will be
+ * serialized to a file. This is done to prevent possible deadlocks with
+ * another parallel apply worker (refer to the comments atop this file).
+ */
+void
+pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
+ bool stream_locked)
+{
+ /*
+ * The parallel apply worker could be stuck for some reason (say waiting
+ * on some lock by other backend), so stop trying to send data directly to
+ * it and start serializing data to the file instead.
+ */
+ winfo->serialize_changes = true;
+
+ /* Initialize the stream fileset. */
+ stream_start_internal(winfo->shared->xid, true);
+
+ /*
+ * Acquires the stream lock if not already to make sure that the parallel
+ * apply worker will wait for the leader to release the stream lock until
+ * the end of the transaction.
+ */
+ if (!stream_locked)
+ pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
+
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
+}
+
+/*
+ * Wait until the parallel apply worker's transaction state has reached or
+ * exceeded the given xact_state.
+ */
+static void
+pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
+ ParallelTransState xact_state)
+{
+ for (;;)
+ {
+ /*
+ * Stop if the transaction state has reached or exceeded the given
+ * xact_state.
+ */
+ if (pa_get_xact_state(winfo->shared) >= xact_state)
+ break;
+
+ /* Wait to be signalled. */
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 10L,
+ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
+
+ /* Reset the latch so we don't spin. */
+ ResetLatch(MyLatch);
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+ }
+}
+
+/*
+ * Wait until the parallel apply worker's transaction finishes.
+ */
+static void
+pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
+{
+ /*
+ * Wait until the parallel apply worker set the state to
+ * PARALLEL_TRANS_STARTED which means it has acquired the transaction
+ * lock. This is to prevent leader apply worker from acquiring the
+ * transaction lock earlier than the parallel apply worker.
+ */
+ pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
+
+ /*
+ * Wait for the transaction lock to be released. This is required to
+ * detect deadlock among leader and parallel apply workers. Refer to the
+ * comments atop this file.
+ */
+ pa_lock_transaction(winfo->shared->xid, AccessShareLock);
+ pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
+
+ /*
+ * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
+ * apply worker failed while applying changes causing the lock to be
+ * released.
+ */
+ if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the logical replication parallel apply worker")));
+}
+
+/*
+ * Set the transaction state for a given parallel apply worker.
+ */
+void
+pa_set_xact_state(ParallelApplyWorkerShared *wshared,
+ ParallelTransState xact_state)
+{
+ SpinLockAcquire(&wshared->mutex);
+ wshared->xact_state = xact_state;
+ SpinLockRelease(&wshared->mutex);
+}
+
+/*
+ * Get the transaction state for a given parallel apply worker.
+ */
+static ParallelTransState
+pa_get_xact_state(ParallelApplyWorkerShared *wshared)
+{
+ ParallelTransState xact_state;
+
+ SpinLockAcquire(&wshared->mutex);
+ xact_state = wshared->xact_state;
+ SpinLockRelease(&wshared->mutex);
+
+ return xact_state;
+}
+
+/*
+ * Cache the parallel apply worker information.
+ */
+void
+pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
+{
+ stream_apply_worker = winfo;
+}
+
+/*
+ * Form a unique savepoint name for the streaming transaction.
+ *
+ * Note that different subscriptions for publications on different nodes can
+ * receive same remote xid, so we need to use subscription id along with it.
+ *
+ * Returns the name in the supplied buffer.
+ */
+static void
+pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
+{
+ snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
+}
+
+/*
+ * Define a savepoint for a subxact in parallel apply worker if needed.
+ *
+ * The parallel apply worker can figure out if a new subtransaction was
+ * started by checking if the new change arrived with a different xid. In that
+ * case define a named savepoint, so that we are able to rollback to it
+ * if required.
+ */
+void
+pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
+{
+ if (current_xid != top_xid &&
+ !list_member_xid(subxactlist, current_xid))
+ {
+ MemoryContext oldctx;
+ char spname[NAMEDATALEN];
+
+ pa_savepoint_name(MySubscription->oid, current_xid,
+ spname, sizeof(spname));
+
+ elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
+
+ /* We must be in transaction block to define the SAVEPOINT. */
+ if (!IsTransactionBlock())
+ {
+ if (!IsTransactionState())
+ StartTransactionCommand();
+
+ BeginTransactionBlock();
+ CommitTransactionCommand();
+ }
+
+ DefineSavepoint(spname);
+
+ /*
+ * CommitTransactionCommand is needed to start a subtransaction after
+ * issuing a SAVEPOINT inside a transaction block (see
+ * StartSubTransaction()).
+ */
+ CommitTransactionCommand();
+
+ oldctx = MemoryContextSwitchTo(TopTransactionContext);
+ subxactlist = lappend_xid(subxactlist, current_xid);
+ MemoryContextSwitchTo(oldctx);
+ }
+}
+
+/* Reset the list that maintains subtransactions. */
+void
+pa_reset_subtrans(void)
+{
+ /*
+ * We don't need to free this explicitly as the allocated memory will be
+ * freed at the transaction end.
+ */
+ subxactlist = NIL;
+}
+
+/*
+ * Handle STREAM ABORT message when the transaction was applied in a parallel
+ * apply worker.
+ */
+void
+pa_stream_abort(LogicalRepStreamAbortData *abort_data)
+{
+ TransactionId xid = abort_data->xid;
+ TransactionId subxid = abort_data->subxid;
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = abort_data->abort_lsn;
+ replorigin_session_origin_timestamp = abort_data->abort_time;
+
+ /*
+ * If the two XIDs are the same, it's in fact abort of toplevel xact, so
+ * just free the subxactlist.
+ */
+ if (subxid == xid)
+ {
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
+
+ /*
+ * Release the lock as we might be processing an empty streaming
+ * transaction in which case the lock won't be released during
+ * transaction rollback.
+ *
+ * Note that it's ok to release the transaction lock before aborting
+ * the transaction because even if the parallel apply worker dies due
+ * to crash or some other reason, such a transaction would still be
+ * considered aborted.
+ */
+ pa_unlock_transaction(xid, AccessExclusiveLock);
+
+ AbortCurrentTransaction();
+
+ if (IsTransactionBlock())
+ {
+ EndTransactionBlock(false);
+ CommitTransactionCommand();
+ }
+
+ pa_reset_subtrans();
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+ }
+ else
+ {
+ /* OK, so it's a subxact. Rollback to the savepoint. */
+ int i;
+ char spname[NAMEDATALEN];
+
+ pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
+
+ elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
+
+ /*
+ * Search the subxactlist, determine the offset tracked for the
+ * subxact, and truncate the list.
+ *
+ * Note that for an empty sub-transaction we won't find the subxid
+ * here.
+ */
+ for (i = list_length(subxactlist) - 1; i >= 0; i--)
+ {
+ TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
+
+ if (xid_tmp == subxid)
+ {
+ RollbackToSavepoint(spname);
+ CommitTransactionCommand();
+ subxactlist = list_truncate(subxactlist, i);
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * Set the fileset state for a particular parallel apply worker. The fileset
+ * will be set once the leader worker serialized all changes to the file
+ * so that it can be used by parallel apply worker.
+ */
+void
+pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
+ PartialFileSetState fileset_state)
+{
+ SpinLockAcquire(&wshared->mutex);
+ wshared->fileset_state = fileset_state;
+
+ if (fileset_state == FS_SERIALIZE_DONE)
+ {
+ Assert(am_leader_apply_worker());
+ Assert(MyLogicalRepWorker->stream_fileset);
+ wshared->fileset = *MyLogicalRepWorker->stream_fileset;
+ }
+
+ SpinLockRelease(&wshared->mutex);
+}
+
+/*
+ * Get the fileset state for the current parallel apply worker.
+ */
+static PartialFileSetState
+pa_get_fileset_state(void)
+{
+ PartialFileSetState fileset_state;
+
+ Assert(am_parallel_apply_worker());
+
+ SpinLockAcquire(&MyParallelShared->mutex);
+ fileset_state = MyParallelShared->fileset_state;
+ SpinLockRelease(&MyParallelShared->mutex);
+
+ return fileset_state;
+}
+
+/*
+ * Helper functions to acquire and release a lock for each stream block.
+ *
+ * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
+ * stream lock.
+ *
+ * Refer to the comments atop this file to see how the stream lock is used.
+ */
+void
+pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
+{
+ LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
+ PARALLEL_APPLY_LOCK_STREAM, lockmode);
+}
+
+void
+pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
+{
+ UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
+ PARALLEL_APPLY_LOCK_STREAM, lockmode);
+}
+
+/*
+ * Helper functions to acquire and release a lock for each local transaction
+ * apply.
+ *
+ * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
+ * transaction lock.
+ *
+ * Note that all the callers must pass a remote transaction ID instead of a
+ * local transaction ID as xid. This is because the local transaction ID will
+ * only be assigned while applying the first change in the parallel apply but
+ * it's possible that the first change in the parallel apply worker is blocked
+ * by a concurrently executing transaction in another parallel apply worker. We
+ * can only communicate the local transaction id to the leader after applying
+ * the first change so it won't be able to wait after sending the xact finish
+ * command using this lock.
+ *
+ * Refer to the comments atop this file to see how the transaction lock is
+ * used.
+ */
+void
+pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
+{
+ LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
+ PARALLEL_APPLY_LOCK_XACT, lockmode);
+}
+
+void
+pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
+{
+ UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
+ PARALLEL_APPLY_LOCK_XACT, lockmode);
+}
+
+/*
+ * Decrement the number of pending streaming blocks and wait on the stream lock
+ * if there is no pending block available.
+ */
+void
+pa_decr_and_wait_stream_block(void)
+{
+ Assert(am_parallel_apply_worker());
+
+ /*
+ * It is only possible to not have any pending stream chunks when we are
+ * applying spooled messages.
+ */
+ if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
+ {
+ if (pa_has_spooled_message_pending())
+ return;
+
+ elog(ERROR, "invalid pending streaming chunk 0");
+ }
+
+ if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
+ {
+ pa_lock_stream(MyParallelShared->xid, AccessShareLock);
+ pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
+ }
+}
+
+/*
+ * Finish processing the streaming transaction in the leader apply worker.
+ */
+void
+pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
+{
+ Assert(am_leader_apply_worker());
+
+ /*
+ * Unlock the shared object lock so that parallel apply worker can
+ * continue to receive and apply changes.
+ */
+ pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
+
+ /*
+ * Wait for that worker to finish. This is necessary to maintain commit
+ * order which avoids failures due to transaction dependencies and
+ * deadlocks.
+ */
+ pa_wait_for_xact_finish(winfo);
+
+ if (!XLogRecPtrIsInvalid(remote_lsn))
+ store_flush_position(remote_lsn, winfo->shared->last_commit_end);
+
+ pa_free_worker(winfo);
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c3bc8ecc926..a53e23c679d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -822,10 +822,11 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
- buf->record->EndRecPtr);
+ buf->record->EndRecPtr, abort_time);
}
- ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
+ abort_time);
}
/* update the decoding stats */
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c050..afb7acddaa6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -55,6 +55,7 @@
/* GUC variables */
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
+int max_parallel_apply_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@@ -74,6 +75,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static int logicalrep_pa_worker_count(Oid subid);
static bool on_commit_launcher_wakeup = false;
@@ -152,8 +154,10 @@ get_subscription_list(void)
*
* This is only needed for cleaning up the shared memory in case the worker
* fails to attach.
+ *
+ * Returns whether the attach was successful.
*/
-static void
+static bool
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
@@ -169,11 +173,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- /* Worker either died or has started; no need to do anything. */
+ /* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
- return;
+ return worker->in_use;
}
LWLockRelease(LogicalRepWorkerLock);
@@ -188,7 +192,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
- return;
+ return false;
}
/*
@@ -210,6 +214,8 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
* Walks the workers array and searches for one that matches given
* subscription id and relid.
+ *
+ * We are only interested in the leader apply worker or table sync worker.
*/
LogicalRepWorker *
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
@@ -224,6 +230,10 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ /* Skip parallel apply workers. */
+ if (isParallelApplyWorker(w))
+ continue;
+
if (w->in_use && w->subid == subid && w->relid == relid &&
(!only_running || w->proc))
{
@@ -260,11 +270,13 @@ logicalrep_workers_find(Oid subid, bool only_running)
}
/*
- * Start new apply background worker, if possible.
+ * Start new logical replication background worker, if possible.
+ *
+ * Returns true on success, false on failure.
*/
-void
+bool
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+ Oid relid, dsm_handle subworker_dsm)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -273,7 +285,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int slot = 0;
LogicalRepWorker *worker = NULL;
int nsyncworkers;
+ int nparallelapplyworkers;
TimestampTz now;
+ bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+
+ /* Sanity check - tablesync worker cannot be a subworker */
+ Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -351,7 +368,20 @@ retry:
if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
- return;
+ return false;
+ }
+
+ nparallelapplyworkers = logicalrep_pa_worker_count(subid);
+
+ /*
+ * Return false if the number of parallel apply workers reached the limit
+ * per subscription.
+ */
+ if (is_parallel_apply_worker &&
+ nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ return false;
}
/*
@@ -365,7 +395,7 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
errhint("You might need to increase max_logical_replication_workers.")));
- return;
+ return false;
}
/* Prepare the worker slot. */
@@ -380,6 +410,8 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
+ worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
+ worker->parallel_apply = is_parallel_apply_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -397,19 +429,34 @@ retry:
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+
+ if (is_parallel_apply_worker)
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+ else
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+
if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u sync %u", subid, relid);
+ else if (is_parallel_apply_worker)
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication parallel apply worker for subscription %u", subid);
else
snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker for subscription %u", subid);
- snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
+ "logical replication apply worker for subscription %u", subid);
+
+ if (is_parallel_apply_worker)
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+ else
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
+ if (is_parallel_apply_worker)
+ memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
/* Failed to start worker, so clean up the worker slot. */
@@ -422,33 +469,23 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
errhint("You might need to increase max_worker_processes.")));
- return;
+ return false;
}
/* Now wait until it attaches. */
- WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
+ return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
}
/*
- * Stop the logical replication worker for subid/relid, if any, and wait until
- * it detaches from the slot.
+ * Internal function to stop the worker and wait until it detaches from the
+ * slot.
*/
-void
-logicalrep_worker_stop(Oid subid, Oid relid)
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
{
- LogicalRepWorker *worker;
uint16 generation;
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- worker = logicalrep_worker_find(subid, relid, false);
-
- /* No worker, nothing to do. */
- if (!worker)
- {
- LWLockRelease(LogicalRepWorkerLock);
- return;
- }
+ Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
/*
* Remember which generation was our worker so we can check if what we see
@@ -486,10 +523,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
* different, meaning that a different worker has taken the slot.
*/
if (!worker->in_use || worker->generation != generation)
- {
- LWLockRelease(LogicalRepWorkerLock);
return;
- }
/* Worker has assigned proc, so it has started. */
if (worker->proc)
@@ -497,7 +531,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/* Now terminate the worker ... */
- kill(worker->proc->pid, SIGTERM);
+ kill(worker->proc->pid, signo);
/* ... and wait for it to die. */
for (;;)
@@ -523,6 +557,53 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
}
+}
+
+/*
+ * Stop the logical replication worker for subid/relid, if any.
+ */
+void
+logicalrep_worker_stop(Oid subid, Oid relid)
+{
+ LogicalRepWorker *worker;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ worker = logicalrep_worker_find(subid, relid, false);
+
+ if (worker)
+ {
+ Assert(!isParallelApplyWorker(worker));
+ logicalrep_worker_stop_internal(worker, SIGTERM);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
+
+/*
+ * Stop the logical replication parallel apply worker corresponding to the
+ * input slot number.
+ *
+ * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
+ * worker so that the worker exits cleanly.
+ */
+void
+logicalrep_pa_worker_stop(int slot_no, uint16 generation)
+{
+ LogicalRepWorker *worker;
+
+ Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ worker = &LogicalRepCtx->workers[slot_no];
+ Assert(isParallelApplyWorker(worker));
+
+ /*
+ * Only stop the worker if the generation matches and the worker is alive.
+ */
+ if (worker->generation == generation && worker->proc)
+ logicalrep_worker_stop_internal(worker, SIGINT);
LWLockRelease(LogicalRepWorkerLock);
}
@@ -595,11 +676,40 @@ logicalrep_worker_attach(int slot)
}
/*
- * Detach the worker (cleans up the worker info).
+ * Stop the parallel apply workers if any, and detach the leader apply worker
+ * (cleans up the worker info).
*/
static void
logicalrep_worker_detach(void)
{
+ /* Stop the parallel apply workers. */
+ if (am_leader_apply_worker())
+ {
+ List *workers;
+ ListCell *lc;
+
+ /*
+ * Detach from the error_mq_handle for all parallel apply workers
+ * before terminating them. This prevents the leader apply worker from
+ * receiving the worker termination message and sending it to logs
+ * when the same is already done by the parallel worker.
+ */
+ pa_detach_all_error_mq();
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
+ foreach(lc, workers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+
+ if (isParallelApplyWorker(w))
+ logicalrep_worker_stop_internal(w, SIGTERM);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+ }
+
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
@@ -622,6 +732,8 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->relid = InvalidOid;
+ worker->apply_leader_pid = InvalidPid;
+ worker->parallel_apply = false;
}
/*
@@ -653,6 +765,13 @@ logicalrep_worker_onexit(int code, Datum arg)
if (MyLogicalRepWorker->stream_fileset != NULL)
FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
+ /*
+ * Session level locks may be acquired outside of a transaction in
+ * parallel apply mode and will not be released when the worker
+ * terminates, so manually release all locks before the worker exits.
+ */
+ LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+
ApplyLauncherWakeup();
}
@@ -681,6 +800,33 @@ logicalrep_sync_worker_count(Oid subid)
}
/*
+ * Count the number of registered (but not necessarily running) parallel apply
+ * workers for a subscription.
+ */
+static int
+logicalrep_pa_worker_count(Oid subid)
+{
+ int i;
+ int res = 0;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /*
+ * Scan all attached parallel apply workers, only counting those which
+ * have the given subscription id.
+ */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->subid == subid && isParallelApplyWorker(w))
+ res++;
+ }
+
+ return res;
+}
+
+/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -869,7 +1015,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval;
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ sub->owner, InvalidOid, DSM_HANDLE_INVALID);
}
}
@@ -952,6 +1098,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if (OidIsValid(subid) && worker.subid != subid)
continue;
+ /* Skip if this is a parallel apply worker */
+ if (isParallelApplyWorker(&worker))
+ continue;
+
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build
index 7144d7a8a42..d48cd4c5901 100644
--- a/src/backend/replication/logical/meson.build
+++ b/src/backend/replication/logical/meson.build
@@ -1,6 +1,7 @@
# Copyright (c) 2022-2023, PostgreSQL Global Development Group
backend_sources += files(
+ 'applyparallelworker.c',
'decode.c',
'launcher.c',
'logical.c',
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index fc86768ed64..b754c43840f 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg)
* array doesn't have to be searched when calling
* replorigin_session_advance().
*
- * Obviously only one such cached origin can exist per process and the current
- * cached value can only be set again after the previous value is torn down
- * with replorigin_session_reset().
+ * Normally only one such cached origin can exist per process so the cached
+ * value can only be set again after the previous value is torn down with
+ * replorigin_session_reset(). For this normal case pass acquired_by = 0
+ * (meaning the slot is not allowed to be already acquired by another process).
+ *
+ * However, sometimes multiple processes can safely re-use the same origin slot
+ * (for example, multiple parallel apply processes can safely use the same
+ * origin, provided they maintain commit order by allowing only one process to
+ * commit at a time). For this case the first process must pass acquired_by =
+ * 0, and then the other processes sharing that same origin can pass
+ * acquired_by = PID of the first process.
*/
void
-replorigin_session_setup(RepOriginId node)
+replorigin_session_setup(RepOriginId node, int acquired_by)
{
static bool registered_cleanup;
int i;
@@ -1122,7 +1130,7 @@ replorigin_session_setup(RepOriginId node)
if (curstate->roident != node)
continue;
- else if (curstate->acquired_by != 0)
+ else if (curstate->acquired_by != 0 && acquired_by == 0)
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
@@ -1153,7 +1161,11 @@ replorigin_session_setup(RepOriginId node)
Assert(session_replication_state->roident != InvalidRepOriginId);
- session_replication_state->acquired_by = MyProcPid;
+ if (acquired_by == 0)
+ session_replication_state->acquired_by = MyProcPid;
+ else if (session_replication_state->acquired_by != acquired_by)
+ elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
+ node, acquired_by);
LWLockRelease(ReplicationOriginLock);
@@ -1337,7 +1349,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin);
+ replorigin_session_setup(origin, 0);
replorigin_session_origin = origin;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index d00e90dd5e0..3a9d53a61ed 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -1164,10 +1164,14 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
/*
* Write STREAM ABORT to the output stream. Note that xid and subxid will be
* same for the top-level transaction abort.
+ *
+ * If write_abort_info is true, send the abort_lsn and abort_time fields,
+ * otherwise don't.
*/
void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
- TransactionId subxid)
+ TransactionId subxid, XLogRecPtr abort_lsn,
+ TimestampTz abort_time, bool write_abort_info)
{
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
@@ -1176,19 +1180,40 @@ logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
/* transaction ID */
pq_sendint32(out, xid);
pq_sendint32(out, subxid);
+
+ if (write_abort_info)
+ {
+ pq_sendint64(out, abort_lsn);
+ pq_sendint64(out, abort_time);
+ }
}
/*
* Read STREAM ABORT from the output stream.
+ *
+ * If read_abort_info is true, read the abort_lsn and abort_time fields,
+ * otherwise don't.
*/
void
-logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
- TransactionId *subxid)
+logicalrep_read_stream_abort(StringInfo in,
+ LogicalRepStreamAbortData *abort_data,
+ bool read_abort_info)
{
- Assert(xid && subxid);
+ Assert(abort_data);
+
+ abort_data->xid = pq_getmsgint(in, 4);
+ abort_data->subxid = pq_getmsgint(in, 4);
- *xid = pq_getmsgint(in, 4);
- *subxid = pq_getmsgint(in, 4);
+ if (read_abort_info)
+ {
+ abort_data->abort_lsn = pq_getmsgint64(in);
+ abort_data->abort_time = pq_getmsgint64(in);
+ }
+ else
+ {
+ abort_data->abort_lsn = InvalidXLogRecPtr;
+ abort_data->abort_time = 0;
+ }
}
/*
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 29a018c58af..54ee824e6c4 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2873,7 +2873,8 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
* disk.
*/
void
-ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
+ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+ TimestampTz abort_time)
{
ReorderBufferTXN *txn;
@@ -2884,6 +2885,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
if (txn == NULL)
return;
+ txn->xact_time.abort_time = abort_time;
+
/* For streamed transactions notify the remote node about the abort. */
if (rbtxn_is_streamed(txn))
{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 09b3e8b32ac..38dfce71296 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -14,7 +14,7 @@
* The initial data synchronization is done separately for each table,
* in a separate apply worker that only fetches the initial snapshot data
* from the publisher and then synchronizes the position in the stream with
- * the main apply worker.
+ * the leader apply worker.
*
* There are several reasons for doing the synchronization this way:
* - It allows us to parallelize the initial data synchronization
@@ -153,7 +153,7 @@ finish_sync_worker(void)
get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
- /* Find the main apply worker and signal it. */
+ /* Find the leader apply worker and signal it. */
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
@@ -588,7 +588,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
- rstate->relid);
+ rstate->relid,
+ DSM_HANDLE_INVALID);
hentry->last_start_time = now;
}
}
@@ -636,6 +637,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
+ /*
+ * Skip for parallel apply workers because they only operate on tables
+ * that are in a READY state. See pa_can_start() and
+ * should_apply_changes_for_rel().
+ */
+ if (am_parallel_apply_worker())
+ return;
+
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
@@ -1254,7 +1263,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/*
* Here we use the slot name instead of the subscription name as the
- * application_name, so that it is different from the main apply worker,
+ * application_name, so that it is different from the leader apply worker,
* so that synchronous replication can distinguish them.
*/
LogRepWorkerWalRcvConn =
@@ -1302,7 +1311,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* time this tablesync was launched.
*/
originid = replorigin_by_name(originname, false);
- replorigin_session_setup(originid);
+ replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
@@ -1413,7 +1422,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
true /* go backward */ , true /* WAL log */ );
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
- replorigin_session_setup(originid);
+ replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
}
else
@@ -1468,8 +1477,8 @@ copy_table_done:
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
- * Finally, wait until the main apply worker tells us to catch up and then
- * return to let LogicalRepApplyLoop do it.
+ * Finally, wait until the leader apply worker tells us to catch up and
+ * then return to let LogicalRepApplyLoop do it.
*/
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f8649e142c3..79cda394453 100644..100755
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -22,8 +22,12 @@
* STREAMED TRANSACTIONS
* ---------------------
* Streamed transactions (large transactions exceeding a memory limit on the
- * upstream) are not applied immediately, but instead, the data is written
- * to temporary files and then applied at once when the final commit arrives.
+ * upstream) are applied using one of two approaches:
+ *
+ * 1) Write to temporary files and apply when the final commit arrives
+ *
+ * This approach is used when the user has set the subscription's streaming
+ * option as on.
*
* Unlike the regular (non-streamed) case, handling streamed transactions has
* to handle aborts of both the toplevel transaction and subtransactions. This
@@ -50,6 +54,12 @@
* the file we desired across multiple stream-open calls for the same
* transaction.
*
+ * 2) Parallel apply workers.
+ *
+ * This approach is used when the user has set the subscription's streaming
+ * option as parallel. See logical/applyparallelworker.c for information about
+ * this approach.
+ *
* TWO_PHASE TRANSACTIONS
* ----------------------
* Two phase transactions are replayed at prepare and then committed or
@@ -233,7 +243,52 @@ typedef struct ApplyErrorCallbackArg
char *origin_name;
} ApplyErrorCallbackArg;
-static ApplyErrorCallbackArg apply_error_callback_arg =
+/*
+ * The action to be taken for the changes in the transaction.
+ *
+ * TRANS_LEADER_APPLY:
+ * This action means that we are in the leader apply worker and changes of the
+ * transaction are applied directly by the worker.
+ *
+ * TRANS_LEADER_SERIALIZE:
+ * This action means that we are in the leader apply worker or table sync
+ * worker. Changes are written to temporary files and then applied when the
+ * final commit arrives.
+ *
+ * TRANS_LEADER_SEND_TO_PARALLEL:
+ * This action means that we are in the leader apply worker and need to send
+ * the changes to the parallel apply worker.
+ *
+ * TRANS_LEADER_PARTIAL_SERIALIZE:
+ * This action means that we are in the leader apply worker and have sent some
+ * changes directly to the parallel apply worker and the remaining changes are
+ * serialized to a file, due to timeout while sending data. The parallel apply
+ * worker will apply these serialized changes when the final commit arrives.
+ *
+ * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
+ * serializing changes, the leader worker also needs to serialize the
+ * STREAM_XXX message to a file, and wait for the parallel apply worker to
+ * finish the transaction when processing the transaction finish command. So
+ * this new action was introduced to keep the code and logic clear.
+ *
+ * TRANS_PARALLEL_APPLY:
+ * This action means that we are in the parallel apply worker and changes of
+ * the transaction are applied directly by the worker.
+ */
+typedef enum
+{
+ /* The action for non-streaming transactions. */
+ TRANS_LEADER_APPLY,
+
+ /* Actions for streaming transactions. */
+ TRANS_LEADER_SERIALIZE,
+ TRANS_LEADER_SEND_TO_PARALLEL,
+ TRANS_LEADER_PARTIAL_SERIALIZE,
+ TRANS_PARALLEL_APPLY
+} TransApplyAction;
+
+/* errcontext tracker */
+ApplyErrorCallbackArg apply_error_callback_arg =
{
.command = 0,
.rel = NULL,
@@ -243,7 +298,9 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.origin_name = NULL,
};
-static MemoryContext ApplyMessageContext = NULL;
+ErrorContextCallback *apply_error_context_stack = NULL;
+
+MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
@@ -265,16 +322,25 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
/*
+ * The number of changes applied by parallel apply worker during one streaming
+ * block.
+ */
+static uint32 parallel_stream_nchanges = 0;
+
+/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
* Once we start skipping changes, we don't stop it until we skip all changes of
* the transaction even if pg_subscription is updated and MySubscription->skiplsn
- * gets changed or reset during that. Also, in streaming transaction cases, we
- * don't skip receiving and spooling the changes since we decide whether or not
+ * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
+ * we don't skip receiving and spooling the changes since we decide whether or not
* to skip applying the changes when starting to apply changes. The subskiplsn is
* cleared after successfully skipping the transaction or applying non-empty
* transaction. The latter prevents the mistakenly specified subskiplsn from
- * being left.
+ * being left. Note that we cannot skip the streaming transactions when using
+ * parallel apply workers because we cannot get the finish LSN before applying
+ * the changes. So, we don't start parallel apply worker when finish LSN is set
+ * by the user.
*/
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
@@ -314,23 +380,16 @@ static inline void cleanup_subxact_info(void);
/*
* Serialize and deserialize changes for a toplevel transaction.
*/
-static void stream_cleanup_files(Oid subid, TransactionId xid);
static void stream_open_file(Oid subid, TransactionId xid,
bool first_segment);
static void stream_write_change(char action, StringInfo s);
+static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
-static void store_flush_position(XLogRecPtr remote_lsn);
-
-static void maybe_reread_subscription(void);
-
static void DisableSubscriptionAndExit(void);
-/* prototype needed because of stream_commit */
-static void apply_dispatch(StringInfo s);
-
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -354,19 +413,32 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
-/* Common streaming function to apply all the spooled messages */
-static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
-
/* Functions for skipping changes */
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
static void stop_skipping_changes(void);
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
/* Functions for apply error callback */
-static void apply_error_callback(void *arg);
static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
+static TransApplyAction get_transaction_apply_action(TransactionId xid,
+ ParallelApplyWorkerInfo **winfo);
+
+/*
+ * Return the name of the logical replication worker.
+ */
+static const char *
+get_worker_name(void)
+{
+ if (am_tablesync_worker())
+ return _("logical replication table synchronization worker");
+ else if (am_parallel_apply_worker())
+ return _("logical replication parallel apply worker");
+ else
+ return _("logical replication apply worker");
+}
+
/*
* Form the origin name for the subscription.
*
@@ -396,19 +468,43 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
*
* This is mainly needed for initial relation data sync as that runs in
* separate worker process running in parallel and we need some way to skip
- * changes coming to the main apply worker during the sync of a table.
+ * changes coming to the leader apply worker during the sync of a table.
*
* Note we need to do smaller or equals comparison for SYNCDONE state because
* it might hold position of end of initial slot consistent point WAL
* record + 1 (ie start of next record) and next record can be COMMIT of
* transaction we are now processing (which is what we set remote_final_lsn
* to in apply_handle_begin).
+ *
+ * Note that for streaming transactions that are being applied in the parallel
+ * apply worker, we disallow applying changes if the target table in the
+ * subscription is not in the READY state, because we cannot decide whether to
+ * apply the change as we won't know remote_final_lsn by that time.
+ *
+ * We already checked this in pa_can_start() before assigning the
+ * streaming transaction to the parallel worker, but it also needs to be
+ * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
+ * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
+ * while applying this transaction.
*/
static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
if (am_tablesync_worker())
return MyLogicalRepWorker->relid == rel->localreloid;
+ else if (am_parallel_apply_worker())
+ {
+ /* We don't synchronize rel's that are in unknown state. */
+ if (rel->state != SUBREL_STATE_READY &&
+ rel->state != SUBREL_STATE_UNKNOWN)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+ return rel->state == SUBREL_STATE_READY;
+ }
else
return (rel->state == SUBREL_STATE_READY ||
(rel->state == SUBREL_STATE_SYNCDONE &&
@@ -454,43 +550,110 @@ end_replication_step(void)
}
/*
- * Handle streamed transactions.
+ * Handle streamed transactions for both the leader apply worker and the
+ * parallel apply workers.
+ *
+ * In the streaming case (receiving a block of the streamed transaction), for
+ * serialize mode, simply redirect it to a file for the proper toplevel
+ * transaction, and for parallel mode, the leader apply worker will send the
+ * changes to parallel apply workers and the parallel apply worker will define
+ * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
+ * messages will be applied by both leader apply worker and parallel apply
+ * workers).
*
- * If in streaming mode (receiving a block of streamed transaction), we
- * simply redirect it to a file for the proper toplevel transaction.
+ * Returns true for streamed transactions (when the change is either serialized
+ * to file or sent to parallel apply worker), false otherwise (regular mode or
+ * needs to be processed by parallel apply worker).
*
- * Returns true for streamed transactions, false otherwise (regular mode).
+ * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
+ * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
+ * to a parallel apply worker.
*/
static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
- TransactionId xid;
+ TransactionId current_xid;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+ StringInfoData original_msg;
+
+ apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
- if (!in_streamed_transaction)
+ if (apply_action == TRANS_LEADER_APPLY)
return false;
- Assert(stream_fd != NULL);
Assert(TransactionIdIsValid(stream_xid));
/*
+ * The parallel apply worker needs the xid in this message to decide
+ * whether to define a savepoint, so save the original message that has
+ * not moved the cursor after the xid. We will serialize this message to a
+ * file in PARTIAL_SERIALIZE mode.
+ */
+ original_msg = *s;
+
+ /*
* We should have received XID of the subxact as the first part of the
* message, so extract it.
*/
- xid = pq_getmsgint(s, 4);
+ current_xid = pq_getmsgint(s, 4);
- if (!TransactionIdIsValid(xid))
+ if (!TransactionIdIsValid(current_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- /* Add the new subxact to the array (unless already there). */
- subxact_info_add(xid);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
+ Assert(stream_fd);
+
+ /* Add the new subxact to the array (unless already there). */
+ subxact_info_add(current_xid);
- /* write the change to the current file */
- stream_write_change(action, s);
+ /* Write the change to the current file */
+ stream_write_change(action, s);
+ return true;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ /*
+ * XXX The publisher side doesn't always send relation/type update
+ * messages after the streaming transaction, so also update the
+ * relation/type in leader apply worker. See function
+ * cleanup_rel_sync_cache.
+ */
+ if (pa_send_data(winfo, s->len, s->data))
+ return (action != LOGICAL_REP_MSG_RELATION &&
+ action != LOGICAL_REP_MSG_TYPE);
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, false);
- return true;
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ stream_write_change(action, &original_msg);
+
+ /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
+ return (action != LOGICAL_REP_MSG_RELATION &&
+ action != LOGICAL_REP_MSG_TYPE);
+
+ case TRANS_PARALLEL_APPLY:
+ parallel_stream_nchanges += 1;
+
+ /* Define a savepoint for a subxact if needed. */
+ pa_start_subtrans(current_xid, stream_xid);
+ return false;
+
+ default:
+ Assert(false);
+ return false; /* silence compiler warning */
+ }
}
/*
@@ -928,8 +1091,11 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
* called within the PrepareTransactionBlock below.
*/
- BeginTransactionBlock();
- CommitTransactionCommand(); /* Completes the preceding Begin command. */
+ if (!IsTransactionBlock())
+ {
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+ }
/*
* Update origin state so we can restart streaming from correct position
@@ -976,7 +1142,7 @@ apply_handle_prepare(StringInfo s)
CommitTransactionCommand();
pgstat_report_stat(false);
- store_flush_position(prepare_data.end_lsn);
+ store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
@@ -998,6 +1164,12 @@ apply_handle_prepare(StringInfo s)
/*
* Handle a COMMIT PREPARED of a previously PREPARED transaction.
+ *
+ * Note that we don't need to wait here if the transaction was prepared in a
+ * parallel apply worker. In that case, we have already waited for the prepare
+ * to finish in apply_handle_stream_prepare() which will ensure all the
+ * operations in that transaction have happened in the subscriber, so no
+ * concurrent transaction can cause deadlock or transaction dependency issues.
*/
static void
apply_handle_commit_prepared(StringInfo s)
@@ -1027,7 +1199,7 @@ apply_handle_commit_prepared(StringInfo s)
CommitTransactionCommand();
pgstat_report_stat(false);
- store_flush_position(prepare_data.end_lsn);
+ store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
@@ -1041,6 +1213,12 @@ apply_handle_commit_prepared(StringInfo s)
/*
* Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
+ *
+ * Note that we don't need to wait here if the transaction was prepared in a
+ * parallel apply worker. In that case, we have already waited for the prepare
+ * to finish in apply_handle_stream_prepare() which will ensure all the
+ * operations in that transaction have happened in the subscriber, so no
+ * concurrent transaction can cause deadlock or transaction dependency issues.
*/
static void
apply_handle_rollback_prepared(StringInfo s)
@@ -1082,7 +1260,7 @@ apply_handle_rollback_prepared(StringInfo s)
pgstat_report_stat(false);
- store_flush_position(rollback_data.rollback_end_lsn);
+ store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
@@ -1094,15 +1272,16 @@ apply_handle_rollback_prepared(StringInfo s)
/*
* Handle STREAM PREPARE.
- *
- * Logic is in two parts:
- * 1. Replay all the spooled operations
- * 2. Mark the transaction as prepared
*/
static void
apply_handle_stream_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
if (in_streamed_transaction)
ereport(ERROR,
@@ -1118,24 +1297,98 @@ apply_handle_stream_prepare(StringInfo s)
logicalrep_read_stream_prepare(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
- elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
+ apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
- /* Replay all the spooled operations. */
- apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
- /* Mark the transaction as prepared. */
- apply_handle_prepare_internal(&prepare_data);
+ /*
+ * The transaction has been serialized to file, so replay all the
+ * spooled operations.
+ */
+ apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
+ prepare_data.xid, prepare_data.prepare_lsn);
- CommitTransactionCommand();
+ /* Mark the transaction as prepared. */
+ apply_handle_prepare_internal(&prepare_data);
- pgstat_report_stat(false);
+ CommitTransactionCommand();
- store_flush_position(prepare_data.end_lsn);
+ store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
- in_remote_transaction = false;
+ in_remote_transaction = false;
+
+ /* Unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+
+ elog(DEBUG1, "finished processing the STREAM PREPARE command");
+ break;
- /* unlink the files with serialized changes and subxact info. */
- stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, prepare_data.end_lsn);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ stream_open_and_write_change(prepare_data.xid,
+ LOGICAL_REP_MSG_STREAM_PREPARE,
+ &original_msg);
+
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, prepare_data.end_lsn);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before preparing.
+ */
+ if (stream_fd)
+ stream_close_file();
+
+ begin_replication_step();
+
+ /* Mark the transaction as prepared. */
+ apply_handle_prepare_internal(&prepare_data);
+
+ end_replication_step();
+
+ CommitTransactionCommand();
+
+ MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
+ pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+
+ pa_reset_subtrans();
+
+ elog(DEBUG1, "finished processing the STREAM PREPARE command");
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
+
+ pgstat_report_stat(false);
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
@@ -1173,27 +1426,64 @@ apply_handle_origin(StringInfo s)
}
/*
+ * Initialize fileset (if not already done).
+ *
+ * Create a new file when first_segment is true, otherwise open the existing
+ * file.
+ */
+void
+stream_start_internal(TransactionId xid, bool first_segment)
+{
+ begin_replication_step();
+
+ /*
+ * Initialize the worker's stream_fileset if we haven't yet. This will be
+ * used for the entire duration of the worker so create it in a permanent
+ * context. We create this on the very first streaming message from any
+ * transaction and then use it for this and other streaming transactions.
+ * Now, we could create a fileset at the start of the worker as well but
+ * then we won't be sure that it will ever be used.
+ */
+ if (!MyLogicalRepWorker->stream_fileset)
+ {
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
+ FileSetInit(MyLogicalRepWorker->stream_fileset);
+
+ MemoryContextSwitchTo(oldctx);
+ }
+
+ /* Open the spool file for this transaction. */
+ stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
+
+ /* If this is not the first segment, open existing subxact file. */
+ if (!first_segment)
+ subxact_info_read(MyLogicalRepWorker->subid, xid);
+
+ end_replication_step();
+}
+
+/*
* Handle STREAM START message.
*/
static void
apply_handle_stream_start(StringInfo s)
{
bool first_segment;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
if (in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("duplicate STREAM START message")));
- /*
- * Start a transaction on stream start, this transaction will be committed
- * on the stream stop unless it is a tablesync worker in which case it
- * will be committed after processing all the messages. We need the
- * transaction for handling the buffile, used for serializing the
- * streaming data and subxact info.
- */
- begin_replication_step();
-
/* notify handle methods we're processing a remote transaction */
in_streamed_transaction = true;
@@ -1207,54 +1497,119 @@ apply_handle_stream_start(StringInfo s)
set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
- /*
- * Initialize the worker's stream_fileset if we haven't yet. This will be
- * used for the entire duration of the worker so create it in a permanent
- * context. We create this on the very first streaming message from any
- * transaction and then use it for this and other streaming transactions.
- * Now, we could create a fileset at the start of the worker as well but
- * then we won't be sure that it will ever be used.
- */
- if (MyLogicalRepWorker->stream_fileset == NULL)
+ /* Try to allocate a worker for the streaming transaction. */
+ if (first_segment)
+ pa_allocate_worker(stream_xid);
+
+ apply_action = get_transaction_apply_action(stream_xid, &winfo);
+
+ switch (apply_action)
{
- MemoryContext oldctx;
+ case TRANS_LEADER_SERIALIZE:
- oldctx = MemoryContextSwitchTo(ApplyContext);
+ /*
+ * Function stream_start_internal starts a transaction. This
+ * transaction will be committed on the stream stop unless it is a
+ * tablesync worker in which case it will be committed after
+ * processing all the messages. We need this transaction for
+ * handling the BufFile, used for serializing the streaming data
+ * and subxact info.
+ */
+ stream_start_internal(stream_xid, first_segment);
+ break;
- MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
- FileSetInit(MyLogicalRepWorker->stream_fileset);
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
- MemoryContextSwitchTo(oldctx);
- }
+ /*
+ * Once we start serializing the changes, the parallel apply
+ * worker will wait for the leader to release the stream lock
+ * until the end of the transaction. So, we don't need to release
+ * the lock or increment the stream count in that case.
+ */
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /*
+ * Unlock the shared object lock so that the parallel apply
+ * worker can continue to receive changes.
+ */
+ if (!first_segment)
+ pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
- /* open the spool file for this transaction */
- stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+ /*
+ * Increment the number of streaming blocks waiting to be
+ * processed by parallel apply worker.
+ */
+ pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
- /* if this is not the first segment, open existing subxact file */
- if (!first_segment)
- subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+ /* Cache the parallel apply worker for this transaction. */
+ pa_set_stream_apply_worker(winfo);
+ break;
+ }
- pgstat_report_activity(STATE_RUNNING, NULL);
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, !first_segment);
- end_replication_step();
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ /*
+ * Open the spool file unless it was already opened when switching
+ * to serialize mode. The transaction started in
+ * stream_start_internal will be committed on the stream stop.
+ */
+ if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
+ stream_start_internal(stream_xid, first_segment);
+
+ stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
+
+ /* Cache the parallel apply worker for this transaction. */
+ pa_set_stream_apply_worker(winfo);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+ if (first_segment)
+ {
+ /* Hold the lock until the end of the transaction. */
+ pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
+
+ /*
+ * Signal the leader apply worker, as it may be waiting for
+ * us.
+ */
+ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ }
+
+ parallel_stream_nchanges = 0;
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
}
/*
- * Handle STREAM STOP message.
+ * Update the information about subxacts and close the file.
+ *
+ * This function should be called when the stream_start_internal function has
+ * been called.
*/
-static void
-apply_handle_stream_stop(StringInfo s)
+void
+stream_stop_internal(TransactionId xid)
{
- if (!in_streamed_transaction)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("STREAM STOP message without STREAM START")));
-
/*
- * Close the file with serialized changes, and serialize information about
- * subxacts for the toplevel transaction.
+ * Serialize information about subxacts for the toplevel transaction, then
+ * close the stream messages spool file.
*/
- subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
+ subxact_info_write(MyLogicalRepWorker->subid, xid);
stream_close_file();
/* We must be in a valid transaction state */
@@ -1263,40 +1618,124 @@ apply_handle_stream_stop(StringInfo s)
/* Commit the per-stream transaction */
CommitTransactionCommand();
- in_streamed_transaction = false;
-
/* Reset per-stream context */
MemoryContextReset(LogicalStreamingContext);
-
- pgstat_report_activity(STATE_IDLE, NULL);
- reset_apply_error_context_info();
}
/*
- * Handle STREAM abort message.
+ * Handle STREAM STOP message.
*/
static void
-apply_handle_stream_abort(StringInfo s)
+apply_handle_stream_stop(StringInfo s)
{
- TransactionId xid;
- TransactionId subxid;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
- if (in_streamed_transaction)
+ if (!in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("STREAM ABORT message without STREAM STOP")));
+ errmsg_internal("STREAM STOP message without STREAM START")));
+
+ apply_action = get_transaction_apply_action(stream_xid, &winfo);
+
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
+ stream_stop_internal(stream_xid);
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ /*
+ * Lock before sending the STREAM_STOP message so that the leader
+ * can hold the lock first and the parallel apply worker will wait
+ * for leader to release the lock. See Locking Considerations atop
+ * applyparallelworker.c.
+ */
+ pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ pa_set_stream_apply_worker(NULL);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
+ stream_stop_internal(stream_xid);
+ pa_set_stream_apply_worker(NULL);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+ elog(DEBUG1, "applied %u changes in the streaming chunk",
+ parallel_stream_nchanges);
+
+ /*
+ * By the time parallel apply worker is processing the changes in
+ * the current streaming block, the leader apply worker may have
+ * sent multiple streaming blocks. This can lead to parallel apply
+ * worker start waiting even when there are more chunk of streams
+ * in the queue. So, try to lock only if there is no message left
+ * in the queue. See Locking Considerations atop
+ * applyparallelworker.c.
+ *
+ * Note that here we have a race condition where we can start
+ * waiting even when there are pending streaming chunks. This can
+ * happen if the leader sends another streaming block and acquires
+ * the stream lock again after the parallel apply worker checks
+ * that there is no pending streaming block and before it actually
+ * starts waiting on a lock. We can handle this case by not
+ * allowing the leader to increment the stream block count during
+ * the time parallel apply worker acquires the lock but it is not
+ * clear whether that is worth the complexity.
+ *
+ * Now, if this missed chunk contains rollback to savepoint, then
+ * there is a risk of deadlock which probably shouldn't happen
+ * after restart.
+ */
+ pa_decr_and_wait_stream_block();
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
+
+ in_streamed_transaction = false;
- logicalrep_read_stream_abort(s, &xid, &subxid);
+ /*
+ * The parallel apply worker could be in a transaction in which case we
+ * need to report the state as STATE_IDLEINTRANSACTION.
+ */
+ if (IsTransactionOrTransactionBlock())
+ pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
+ else
+ pgstat_report_activity(STATE_IDLE, NULL);
+ reset_apply_error_context_info();
+}
+
+/*
+ * Helper function to handle STREAM ABORT message when the transaction was
+ * serialized to file.
+ */
+static void
+stream_abort_internal(TransactionId xid, TransactionId subxid)
+{
/*
* If the two XIDs are the same, it's in fact abort of toplevel xact, so
* just delete the files with serialized info.
*/
if (xid == subxid)
- {
- set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
- }
else
{
/*
@@ -1320,8 +1759,6 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
-
subidx = -1;
begin_replication_step();
subxact_info_read(MyLogicalRepWorker->subid, xid);
@@ -1346,7 +1783,6 @@ apply_handle_stream_abort(StringInfo s)
cleanup_subxact_info();
end_replication_step();
CommitTransactionCommand();
- reset_apply_error_context_info();
return;
}
@@ -1369,24 +1805,215 @@ apply_handle_stream_abort(StringInfo s)
end_replication_step();
CommitTransactionCommand();
}
+}
+
+/*
+ * Handle STREAM ABORT message.
+ */
+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+ LogicalRepStreamAbortData abort_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
+ bool toplevel_xact;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM ABORT message without STREAM STOP")));
+
+ /* We receive abort information only when we can apply in parallel. */
+ logicalrep_read_stream_abort(s, &abort_data,
+ MyLogicalRepWorker->parallel_apply);
+
+ xid = abort_data.xid;
+ subxid = abort_data.subxid;
+ toplevel_xact = (xid == subxid);
+
+ set_apply_error_context_xact(subxid, abort_data.abort_lsn);
+
+ apply_action = get_transaction_apply_action(xid, &winfo);
+
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
+
+ /*
+ * We are in the leader apply worker and the transaction has been
+ * serialized to file.
+ */
+ stream_abort_internal(xid, subxid);
+
+ elog(DEBUG1, "finished processing the STREAM ABORT command");
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ /*
+ * For the case of aborting the subtransaction, we increment the
+ * number of streaming blocks and take the lock again before
+ * sending the STREAM_ABORT to ensure that the parallel apply
+ * worker will wait on the lock for the next set of changes after
+ * processing the STREAM_ABORT message if it is not already
+ * waiting for STREAM_STOP message.
+ *
+ * It is important to perform this locking before sending the
+ * STREAM_ABORT message so that the leader can hold the lock first
+ * and the parallel apply worker will wait for the leader to
+ * release the lock. This is the same as what we do in
+ * apply_handle_stream_stop. See Locking Considerations atop
+ * applyparallelworker.c.
+ */
+ if (!toplevel_xact)
+ {
+ pa_unlock_stream(xid, AccessExclusiveLock);
+ pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
+ pa_lock_stream(xid, AccessExclusiveLock);
+ }
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /*
+ * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
+ * wait here for the parallel apply worker to finish as that
+ * is not required to maintain the commit order and won't have
+ * the risk of failures due to transaction dependencies and
+ * deadlocks. However, it is possible that before the parallel
+ * worker finishes and we clear the worker info, the xid
+ * wraparound happens on the upstream and a new transaction
+ * with the same xid can appear and that can lead to duplicate
+ * entries in ParallelApplyTxnHash. Yet another problem could
+ * be that we may have serialized the changes in partial
+ * serialize mode and the file containing xact changes may
+ * already exist, and after xid wraparound trying to create
+ * the file for the same xid can lead to an error. To avoid
+ * these problems, we decide to wait for the aborts to finish.
+ *
+ * Note, it is okay to not update the flush location position
+ * for aborts as in worst case that means such a transaction
+ * won't be sent again after restart.
+ */
+ if (toplevel_xact)
+ pa_xact_finish(winfo, InvalidXLogRecPtr);
+
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ /*
+ * Parallel apply worker might have applied some changes, so write
+ * the STREAM_ABORT message so that it can rollback the
+ * subtransaction if needed.
+ */
+ stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
+ &original_msg);
+
+ if (toplevel_xact)
+ {
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+ pa_xact_finish(winfo, InvalidXLogRecPtr);
+ }
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before aborting.
+ */
+ if (toplevel_xact && stream_fd)
+ stream_close_file();
+
+ pa_stream_abort(&abort_data);
+
+ /*
+ * We need to wait after processing rollback to savepoint for the
+ * next set of changes.
+ *
+ * We have a race condition here due to which we can start waiting
+ * here when there are more chunk of streams in the queue. See
+ * apply_handle_stream_stop.
+ */
+ if (!toplevel_xact)
+ pa_decr_and_wait_stream_block();
+
+ elog(DEBUG1, "finished processing the STREAM ABORT command");
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
reset_apply_error_context_info();
}
/*
- * Common spoolfile processing.
+ * Ensure that the passed location is fileset's end.
*/
static void
-apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
+ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
+ off_t offset)
+{
+ char path[MAXPGPATH];
+ BufFile *fd;
+ int last_fileno;
+ off_t last_offset;
+
+ Assert(!IsTransactionState());
+
+ begin_replication_step();
+
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+
+ fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+
+ BufFileSeek(fd, 0, 0, SEEK_END);
+ BufFileTell(fd, &last_fileno, &last_offset);
+
+ BufFileClose(fd);
+
+ end_replication_step();
+
+ if (last_fileno != fileno || last_offset != offset)
+ elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
+ path);
+}
+
+/*
+ * Common spoolfile processing.
+ */
+void
+apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
+ XLogRecPtr lsn)
{
StringInfoData s2;
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
MemoryContext oldcxt;
- BufFile *fd;
+ ResourceOwner oldowner;
+ int fileno;
+ off_t offset;
- maybe_start_skipping_changes(lsn);
+ if (!am_parallel_apply_worker())
+ maybe_start_skipping_changes(lsn);
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1402,8 +2029,16 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
- false);
+ /*
+ * Make sure the file is owned by the toplevel transaction so that the
+ * file will not be accidentally closed when aborting a subtransaction.
+ */
+ oldowner = CurrentResourceOwner;
+ CurrentResourceOwner = TopTransactionResourceOwner;
+
+ stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+
+ CurrentResourceOwner = oldowner;
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
@@ -1434,7 +2069,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
CHECK_FOR_INTERRUPTS();
/* read length of the on-disk record */
- nbytes = BufFileRead(fd, &len, sizeof(len));
+ nbytes = BufFileRead(stream_fd, &len, sizeof(len));
/* have we reached end of the file? */
if (nbytes == 0)
@@ -1455,12 +2090,14 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
buffer = repalloc(buffer, len);
/* and finally read the data into the buffer */
- if (BufFileRead(fd, buffer, len) != len)
+ if (BufFileRead(stream_fd, buffer, len) != len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
path)));
+ BufFileTell(stream_fd, &fileno, &offset);
+
/* copy the buffer to the stringinfo and call apply_dispatch */
resetStringInfo(&s2);
appendBinaryStringInfo(&s2, buffer, len);
@@ -1476,15 +2113,24 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
nchanges++;
+ /*
+ * It is possible the file has been closed because we have processed
+ * the transaction end message like stream_commit in which case that
+ * must be the last message.
+ */
+ if (!stream_fd)
+ {
+ ensure_last_message(stream_fileset, xid, fileno, offset);
+ break;
+ }
+
if (nchanges % 1000 == 0)
elog(DEBUG1, "replayed %d changes from file \"%s\"",
nchanges, path);
}
- BufFileClose(fd);
-
- pfree(buffer);
- pfree(s2.data);
+ if (stream_fd)
+ stream_close_file();
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
@@ -1500,6 +2146,11 @@ apply_handle_stream_commit(StringInfo s)
{
TransactionId xid;
LogicalRepCommitData commit_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
if (in_streamed_transaction)
ereport(ERROR,
@@ -1509,14 +2160,85 @@ apply_handle_stream_commit(StringInfo s)
xid = logicalrep_read_stream_commit(s, &commit_data);
set_apply_error_context_xact(xid, commit_data.commit_lsn);
- elog(DEBUG1, "received commit for streamed transaction %u", xid);
+ apply_action = get_transaction_apply_action(xid, &winfo);
- apply_spooled_messages(xid, commit_data.commit_lsn);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
- apply_handle_commit_internal(&commit_data);
+ /*
+ * The transaction has been serialized to file, so replay all the
+ * spooled operations.
+ */
+ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
+ commit_data.commit_lsn);
+
+ apply_handle_commit_internal(&commit_data);
+
+ /* Unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+ elog(DEBUG1, "finished processing the STREAM COMMIT command");
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, commit_data.end_lsn);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
+ &original_msg);
+
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, commit_data.end_lsn);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before committing.
+ */
+ if (stream_fd)
+ stream_close_file();
+
+ apply_handle_commit_internal(&commit_data);
+
+ MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+ /*
+ * It is important to set the transaction state as finished before
+ * releasing the lock. See pa_wait_for_xact_finish.
+ */
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
+ pa_unlock_transaction(xid, AccessExclusiveLock);
- /* unlink the files with serialized changes and subxact info */
- stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ pa_reset_subtrans();
+
+ elog(DEBUG1, "finished processing the STREAM COMMIT command");
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
@@ -1560,9 +2282,16 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
replorigin_session_origin_timestamp = commit_data->committime;
CommitTransactionCommand();
+
+ if (IsTransactionBlock())
+ {
+ EndTransactionBlock(false);
+ CommitTransactionCommand();
+ }
+
pgstat_report_stat(false);
- store_flush_position(commit_data->end_lsn);
+ store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
}
else
{
@@ -2508,7 +3237,7 @@ apply_handle_truncate(StringInfo s)
/*
* Logical replication protocol message dispatcher.
*/
-static void
+void
apply_dispatch(StringInfo s)
{
LogicalRepMsgType action = pq_getmsgbyte(s);
@@ -2672,17 +3401,24 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
/*
* Store current remote/local lsn pair in the tracking list.
*/
-static void
-store_flush_position(XLogRecPtr remote_lsn)
+void
+store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
{
FlushPosition *flushpos;
+ /*
+ * Skip for parallel apply workers, because the lsn_mapping is maintained
+ * by the leader apply worker.
+ */
+ if (am_parallel_apply_worker())
+ return;
+
/* Need to do this in permanent context */
MemoryContextSwitchTo(ApplyContext);
/* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
- flushpos->local_end = XactLastCommitEnd;
+ flushpos->local_end = local_lsn;
flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node);
@@ -2741,6 +3477,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
errcallback.callback = apply_error_callback;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
+ apply_error_context_stack = error_context_stack;
/* This outer loop iterates once per wait. */
for (;;)
@@ -2955,6 +3692,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* Pop the error context stack */
error_context_stack = errcallback.previous;
+ apply_error_context_stack = error_context_stack;
/* All done */
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
@@ -3053,9 +3791,31 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
}
/*
- * Reread subscription info if needed. Most changes will be exit.
+ * Exit routine for apply workers due to subscription parameter changes.
*/
static void
+apply_worker_exit(void)
+{
+ if (am_parallel_apply_worker())
+ {
+ /*
+ * Don't stop the parallel apply worker as the leader will detect the
+ * subscription parameter change and restart logical replication later
+ * anyway. This also prevents the leader from reporting errors when
+ * trying to communicate with a stopped parallel apply worker, which
+ * would accidentally disable subscriptions if disable_on_error was
+ * set.
+ */
+ return;
+ }
+
+ proc_exit(0);
+}
+
+/*
+ * Reread subscription info if needed. Most changes will be exit.
+ */
+void
maybe_reread_subscription(void)
{
MemoryContext oldctx;
@@ -3085,9 +3845,9 @@ maybe_reread_subscription(void)
if (!newsub)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "stop because the subscription was removed",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
+ get_worker_name(), MySubscription->name)));
proc_exit(0);
}
@@ -3096,11 +3856,11 @@ maybe_reread_subscription(void)
if (!newsub->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "stop because the subscription was disabled",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will stop because the subscription was disabled",
+ get_worker_name(), MySubscription->name)));
- proc_exit(0);
+ apply_worker_exit();
}
/* !slotname should never happen when enabled is true. */
@@ -3111,7 +3871,10 @@ maybe_reread_subscription(void)
/*
* Exit if any parameter that affects the remote connection was changed.
- * The launcher will start a new worker.
+ * The launcher will start a new worker but note that the parallel apply
+ * worker won't restart if the streaming option's value is changed from
+ * 'parallel' to any other value or the server decides not to stream the
+ * in-progress transaction.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
strcmp(newsub->name, MySubscription->name) != 0 ||
@@ -3122,11 +3885,17 @@ maybe_reread_subscription(void)
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
- ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
- MySubscription->name)));
+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will restart because of a parameter change",
+ get_worker_name(), MySubscription->name)));
- proc_exit(0);
+ apply_worker_exit();
}
/* Check for other changes that should never happen too. */
@@ -3378,7 +4147,7 @@ changes_filename(char *path, Oid subid, TransactionId xid)
* toplevel transaction. Each subscription has a separate set of files
* for any toplevel transaction.
*/
-static void
+void
stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
@@ -3401,9 +4170,6 @@ stream_cleanup_files(Oid subid, TransactionId xid)
* by stream_xid (global variable). If it's the first chunk of streamed
* changes for this transaction, create the buffile, otherwise open the
* previously created file.
- *
- * This can only be called at the beginning of a "streaming" block, i.e.
- * between stream_start/stream_stop messages from the upstream.
*/
static void
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
@@ -3411,7 +4177,6 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
char path[MAXPGPATH];
MemoryContext oldcxt;
- Assert(in_streamed_transaction);
Assert(OidIsValid(subid));
Assert(TransactionIdIsValid(xid));
Assert(stream_fd == NULL);
@@ -3450,15 +4215,10 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
/*
* stream_close_file
* Close the currently open file with streamed changes.
- *
- * This can only be called at the end of a streaming block, i.e. at stream_stop
- * message from the upstream.
*/
static void
stream_close_file(void)
{
- Assert(in_streamed_transaction);
- Assert(TransactionIdIsValid(stream_xid));
Assert(stream_fd != NULL);
BufFileClose(stream_fd);
@@ -3480,8 +4240,6 @@ stream_write_change(char action, StringInfo s)
{
int len;
- Assert(in_streamed_transaction);
- Assert(TransactionIdIsValid(stream_xid));
Assert(stream_fd != NULL);
/* total on-disk size, including the action type character */
@@ -3500,6 +4258,26 @@ stream_write_change(char action, StringInfo s)
}
/*
+ * stream_open_and_write_change
+ * Serialize a message to a file for the given transaction.
+ *
+ * This function is similar to stream_write_change except that it will open the
+ * target file if not already before writing the message and close the file at
+ * the end.
+ */
+static void
+stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
+{
+ Assert(!in_streamed_transaction);
+
+ if (!stream_fd)
+ stream_start_internal(xid, false);
+
+ stream_write_change(action, s);
+ stream_stop_internal(xid);
+}
+
+/*
* Cleanup the memory for subxacts and reset the related variables.
*/
static inline void
@@ -3610,37 +4388,16 @@ start_apply(XLogRecPtr origin_startpos)
PG_END_TRY();
}
-/* Logical Replication Apply worker entry point */
+/*
+ * Common initialization for leader apply worker and parallel apply worker.
+ *
+ * Initialize the database connection, in-memory subscription and necessary
+ * config options.
+ */
void
-ApplyWorkerMain(Datum main_arg)
+InitializeApplyWorker(void)
{
- int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos = InvalidXLogRecPtr;
- char *myslotname = NULL;
- WalRcvStreamOptions options;
- int server_version;
-
- /* Attach to slot */
- logicalrep_worker_attach(worker_slot);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGTERM, die);
- BackgroundWorkerUnblockSignals();
-
- /*
- * We don't currently need any ResourceOwner in a walreceiver process, but
- * if we did, we could call CreateAuxProcessResourceOwner here.
- */
-
- /* Initialise stats to a sanish value */
- MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
- MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
-
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
/* Run as replica session replication role. */
SetConfigOption("session_replication_role", "replica",
@@ -3668,9 +4425,10 @@ ApplyWorkerMain(Datum main_arg)
if (!MySubscription)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription %u will not "
- "start because the subscription was removed during startup",
- MyLogicalRepWorker->subid)));
+ /* translator: %s is the name of logical replication worker */
+ (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
+ get_worker_name(), MyLogicalRepWorker->subid)));
+
proc_exit(0);
}
@@ -3680,11 +4438,11 @@ ApplyWorkerMain(Datum main_arg)
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup",
+ get_worker_name(), MySubscription->name)));
- proc_exit(0);
+ apply_worker_exit();
}
/* Setup synchronous commit according to the user's wishes */
@@ -3699,13 +4457,49 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
- MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));
else
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" has started",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" has started",
+ get_worker_name(), MySubscription->name)));
CommitTransactionCommand();
+}
+
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *myslotname = NULL;
+ WalRcvStreamOptions options;
+ int server_version;
+
+ /* Attach to slot */
+ logicalrep_worker_attach(worker_slot);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGTERM, die);
+ BackgroundWorkerUnblockSignals();
+
+ /*
+ * We don't currently need any ResourceOwner in a walreceiver process, but
+ * if we did, we could call CreateAuxProcessResourceOwner here.
+ */
+
+ /* Initialise stats to a sanish value */
+ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+ MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ InitializeApplyWorker();
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
@@ -3715,20 +4509,15 @@ ApplyWorkerMain(Datum main_arg)
{
start_table_sync(&origin_startpos, &myslotname);
- /*
- * Allocate the origin name in long-lived context for error context
- * message.
- */
ReplicationOriginNameForLogicalRep(MySubscription->oid,
MyLogicalRepWorker->relid,
originname,
sizeof(originname));
- apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
- originname);
+ set_apply_error_context_origin(originname);
}
else
{
- /* This is main apply worker */
+ /* This is the leader apply worker */
RepOriginId originid;
TimeLineID startpointTLI;
char *err;
@@ -3752,7 +4541,7 @@ ApplyWorkerMain(Datum main_arg)
originid = replorigin_by_name(originname, true);
if (!OidIsValid(originid))
originid = replorigin_create(originname);
- replorigin_session_setup(originid);
+ replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
@@ -3770,12 +4559,7 @@ ApplyWorkerMain(Datum main_arg)
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
- /*
- * Allocate the origin name in long-lived context for error context
- * message.
- */
- apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
- originname);
+ set_apply_error_context_origin(originname);
}
/*
@@ -3793,13 +4577,36 @@ ApplyWorkerMain(Datum main_arg)
server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
+ server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
- options.proto.logical.streaming = MySubscription->stream;
+
+ /*
+ * Assign the appropriate option value for streaming option according to
+ * the 'streaming' mode and the publisher's ability to support that mode.
+ */
+ if (server_version >= 160000 &&
+ MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
+ {
+ options.proto.logical.streaming_str = "parallel";
+ MyLogicalRepWorker->parallel_apply = true;
+ }
+ else if (server_version >= 140000 &&
+ MySubscription->stream != LOGICALREP_STREAM_OFF)
+ {
+ options.proto.logical.streaming_str = "on";
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+ else
+ {
+ options.proto.logical.streaming_str = NULL;
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+
options.proto.logical.twophase = false;
options.proto.logical.origin = pstrdup(MySubscription->origin);
@@ -3897,6 +4704,15 @@ IsLogicalWorker(void)
}
/*
+ * Is current process a logical replication parallel apply worker?
+ */
+bool
+IsLogicalParallelApplyWorker(void)
+{
+ return IsLogicalWorker() && am_parallel_apply_worker();
+}
+
+/*
* Start skipping changes of the transaction if the given LSN matches the
* LSN specified by subscription's skiplsn.
*/
@@ -3958,7 +4774,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
XLogRecPtr myskiplsn = MySubscription->skiplsn;
bool started_tx = false;
- if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+ if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
return;
if (!IsTransactionState())
@@ -4030,7 +4846,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
}
/* Error callback to give more context info about the change being applied */
-static void
+void
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
@@ -4058,23 +4874,47 @@ apply_error_callback(void *arg)
errarg->remote_xid,
LSN_FORMAT_ARGS(errarg->finish_lsn));
}
- else if (errarg->remote_attnum < 0)
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
- errarg->origin_name,
- logicalrep_message_type(errarg->command),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname,
- errarg->remote_xid,
- LSN_FORMAT_ARGS(errarg->finish_lsn));
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
- errarg->origin_name,
- logicalrep_message_type(errarg->command),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname,
- errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid,
- LSN_FORMAT_ARGS(errarg->finish_lsn));
+ {
+ if (errarg->remote_attnum < 0)
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ else
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ }
}
/* Set transaction information of apply error callback */
@@ -4144,3 +4984,53 @@ AtEOXact_LogicalRepWorkers(bool isCommit)
/* The List storage will be reclaimed automatically in xact cleanup. */
on_commit_wakeup_workers_subids = NIL;
}
+
+/*
+ * Allocate the origin name in long-lived context for error context message.
+ */
+void
+set_apply_error_context_origin(char *originname)
+{
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
+}
+
+/*
+ * Return the action to be taken for the given transaction. *winfo is
+ * assigned to the destination parallel worker info when the leader apply
+ * worker has to pass all the transaction's changes to the parallel apply
+ * worker.
+ */
+static TransApplyAction
+get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
+{
+ *winfo = NULL;
+
+ if (am_parallel_apply_worker())
+ {
+ return TRANS_PARALLEL_APPLY;
+ }
+ else if (in_remote_transaction)
+ {
+ return TRANS_LEADER_APPLY;
+ }
+
+ /*
+ * Check if we are processing this transaction using a parallel apply
+ * worker.
+ */
+ *winfo = pa_find_worker(xid);
+
+ if (!*winfo)
+ {
+ return TRANS_LEADER_SERIALIZE;
+ }
+ else if ((*winfo)->serialize_changes)
+ {
+ return TRANS_LEADER_PARTIAL_SERIALIZE;
+ }
+ else
+ {
+ return TRANS_LEADER_SEND_TO_PARALLEL;
+ }
+}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 876adab38e5..19c10c028f9 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -18,6 +18,7 @@
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
+#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
@@ -290,7 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool origin_option_given = false;
data->binary = false;
- data->streaming = false;
+ data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
@@ -369,7 +370,7 @@ parse_output_parameters(List *options, PGOutputData *data)
errmsg("conflicting or redundant options")));
streaming_given = true;
- data->streaming = defGetBoolean(defel);
+ data->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@@ -461,13 +462,20 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
- if (!data->streaming)
+ if (data->streaming == LOGICALREP_STREAM_OFF)
ctx->streaming = false;
- else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
+ else if (data->streaming == LOGICALREP_STREAM_ON &&
+ data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
+ else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
+ data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
else if (!ctx->streaming)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -1841,6 +1849,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
/*
* The abort should happen outside streaming block, even for streamed
@@ -1854,7 +1864,9 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
+ logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
+ txn->xact_time.abort_time, write_abort_info);
+
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);