aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rwxr-xr-x[-rw-r--r--]src/backend/replication/logical/worker.c1354
1 files changed, 1122 insertions, 232 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f8649e142c3..79cda394453 100644..100755
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -22,8 +22,12 @@
* STREAMED TRANSACTIONS
* ---------------------
* Streamed transactions (large transactions exceeding a memory limit on the
- * upstream) are not applied immediately, but instead, the data is written
- * to temporary files and then applied at once when the final commit arrives.
+ * upstream) are applied using one of two approaches:
+ *
+ * 1) Write to temporary files and apply when the final commit arrives
+ *
+ * This approach is used when the user has set the subscription's streaming
+ * option as on.
*
* Unlike the regular (non-streamed) case, handling streamed transactions has
* to handle aborts of both the toplevel transaction and subtransactions. This
@@ -50,6 +54,12 @@
* the file we desired across multiple stream-open calls for the same
* transaction.
*
+ * 2) Parallel apply workers.
+ *
+ * This approach is used when the user has set the subscription's streaming
+ * option as parallel. See logical/applyparallelworker.c for information about
+ * this approach.
+ *
* TWO_PHASE TRANSACTIONS
* ----------------------
* Two phase transactions are replayed at prepare and then committed or
@@ -233,7 +243,52 @@ typedef struct ApplyErrorCallbackArg
char *origin_name;
} ApplyErrorCallbackArg;
-static ApplyErrorCallbackArg apply_error_callback_arg =
+/*
+ * The action to be taken for the changes in the transaction.
+ *
+ * TRANS_LEADER_APPLY:
+ * This action means that we are in the leader apply worker and changes of the
+ * transaction are applied directly by the worker.
+ *
+ * TRANS_LEADER_SERIALIZE:
+ * This action means that we are in the leader apply worker or table sync
+ * worker. Changes are written to temporary files and then applied when the
+ * final commit arrives.
+ *
+ * TRANS_LEADER_SEND_TO_PARALLEL:
+ * This action means that we are in the leader apply worker and need to send
+ * the changes to the parallel apply worker.
+ *
+ * TRANS_LEADER_PARTIAL_SERIALIZE:
+ * This action means that we are in the leader apply worker and have sent some
+ * changes directly to the parallel apply worker and the remaining changes are
+ * serialized to a file, due to timeout while sending data. The parallel apply
+ * worker will apply these serialized changes when the final commit arrives.
+ *
+ * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
+ * serializing changes, the leader worker also needs to serialize the
+ * STREAM_XXX message to a file, and wait for the parallel apply worker to
+ * finish the transaction when processing the transaction finish command. So
+ * this new action was introduced to keep the code and logic clear.
+ *
+ * TRANS_PARALLEL_APPLY:
+ * This action means that we are in the parallel apply worker and changes of
+ * the transaction are applied directly by the worker.
+ */
+typedef enum
+{
+ /* The action for non-streaming transactions. */
+ TRANS_LEADER_APPLY,
+
+ /* Actions for streaming transactions. */
+ TRANS_LEADER_SERIALIZE,
+ TRANS_LEADER_SEND_TO_PARALLEL,
+ TRANS_LEADER_PARTIAL_SERIALIZE,
+ TRANS_PARALLEL_APPLY
+} TransApplyAction;
+
+/* errcontext tracker */
+ApplyErrorCallbackArg apply_error_callback_arg =
{
.command = 0,
.rel = NULL,
@@ -243,7 +298,9 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.origin_name = NULL,
};
-static MemoryContext ApplyMessageContext = NULL;
+ErrorContextCallback *apply_error_context_stack = NULL;
+
+MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
/* per stream context for streaming transactions */
@@ -265,16 +322,25 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
/*
+ * The number of changes applied by parallel apply worker during one streaming
+ * block.
+ */
+static uint32 parallel_stream_nchanges = 0;
+
+/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
* Once we start skipping changes, we don't stop it until we skip all changes of
* the transaction even if pg_subscription is updated and MySubscription->skiplsn
- * gets changed or reset during that. Also, in streaming transaction cases, we
- * don't skip receiving and spooling the changes since we decide whether or not
+ * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
+ * we don't skip receiving and spooling the changes since we decide whether or not
* to skip applying the changes when starting to apply changes. The subskiplsn is
* cleared after successfully skipping the transaction or applying non-empty
* transaction. The latter prevents the mistakenly specified subskiplsn from
- * being left.
+ * being left. Note that we cannot skip the streaming transactions when using
+ * parallel apply workers because we cannot get the finish LSN before applying
+ * the changes. So, we don't start parallel apply worker when finish LSN is set
+ * by the user.
*/
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
@@ -314,23 +380,16 @@ static inline void cleanup_subxact_info(void);
/*
* Serialize and deserialize changes for a toplevel transaction.
*/
-static void stream_cleanup_files(Oid subid, TransactionId xid);
static void stream_open_file(Oid subid, TransactionId xid,
bool first_segment);
static void stream_write_change(char action, StringInfo s);
+static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
-static void store_flush_position(XLogRecPtr remote_lsn);
-
-static void maybe_reread_subscription(void);
-
static void DisableSubscriptionAndExit(void);
-/* prototype needed because of stream_commit */
-static void apply_dispatch(StringInfo s);
-
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -354,19 +413,32 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
-/* Common streaming function to apply all the spooled messages */
-static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
-
/* Functions for skipping changes */
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
static void stop_skipping_changes(void);
static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
/* Functions for apply error callback */
-static void apply_error_callback(void *arg);
static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
+static TransApplyAction get_transaction_apply_action(TransactionId xid,
+ ParallelApplyWorkerInfo **winfo);
+
+/*
+ * Return the name of the logical replication worker.
+ */
+static const char *
+get_worker_name(void)
+{
+ if (am_tablesync_worker())
+ return _("logical replication table synchronization worker");
+ else if (am_parallel_apply_worker())
+ return _("logical replication parallel apply worker");
+ else
+ return _("logical replication apply worker");
+}
+
/*
* Form the origin name for the subscription.
*
@@ -396,19 +468,43 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
*
* This is mainly needed for initial relation data sync as that runs in
* separate worker process running in parallel and we need some way to skip
- * changes coming to the main apply worker during the sync of a table.
+ * changes coming to the leader apply worker during the sync of a table.
*
* Note we need to do smaller or equals comparison for SYNCDONE state because
* it might hold position of end of initial slot consistent point WAL
* record + 1 (ie start of next record) and next record can be COMMIT of
* transaction we are now processing (which is what we set remote_final_lsn
* to in apply_handle_begin).
+ *
+ * Note that for streaming transactions that are being applied in the parallel
+ * apply worker, we disallow applying changes if the target table in the
+ * subscription is not in the READY state, because we cannot decide whether to
+ * apply the change as we won't know remote_final_lsn by that time.
+ *
+ * We already checked this in pa_can_start() before assigning the
+ * streaming transaction to the parallel worker, but it also needs to be
+ * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
+ * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
+ * while applying this transaction.
*/
static bool
should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
{
if (am_tablesync_worker())
return MyLogicalRepWorker->relid == rel->localreloid;
+ else if (am_parallel_apply_worker())
+ {
+ /* We don't synchronize rel's that are in unknown state. */
+ if (rel->state != SUBREL_STATE_READY &&
+ rel->state != SUBREL_STATE_UNKNOWN)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
+
+ return rel->state == SUBREL_STATE_READY;
+ }
else
return (rel->state == SUBREL_STATE_READY ||
(rel->state == SUBREL_STATE_SYNCDONE &&
@@ -454,43 +550,110 @@ end_replication_step(void)
}
/*
- * Handle streamed transactions.
+ * Handle streamed transactions for both the leader apply worker and the
+ * parallel apply workers.
+ *
+ * In the streaming case (receiving a block of the streamed transaction), for
+ * serialize mode, simply redirect it to a file for the proper toplevel
+ * transaction, and for parallel mode, the leader apply worker will send the
+ * changes to parallel apply workers and the parallel apply worker will define
+ * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
+ * messages will be applied by both leader apply worker and parallel apply
+ * workers).
*
- * If in streaming mode (receiving a block of streamed transaction), we
- * simply redirect it to a file for the proper toplevel transaction.
+ * Returns true for streamed transactions (when the change is either serialized
+ * to file or sent to parallel apply worker), false otherwise (regular mode or
+ * needs to be processed by parallel apply worker).
*
- * Returns true for streamed transactions, false otherwise (regular mode).
+ * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
+ * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
+ * to a parallel apply worker.
*/
static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
{
- TransactionId xid;
+ TransactionId current_xid;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+ StringInfoData original_msg;
+
+ apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
- if (!in_streamed_transaction)
+ if (apply_action == TRANS_LEADER_APPLY)
return false;
- Assert(stream_fd != NULL);
Assert(TransactionIdIsValid(stream_xid));
/*
+ * The parallel apply worker needs the xid in this message to decide
+ * whether to define a savepoint, so save the original message that has
+ * not moved the cursor after the xid. We will serialize this message to a
+ * file in PARTIAL_SERIALIZE mode.
+ */
+ original_msg = *s;
+
+ /*
* We should have received XID of the subxact as the first part of the
* message, so extract it.
*/
- xid = pq_getmsgint(s, 4);
+ current_xid = pq_getmsgint(s, 4);
- if (!TransactionIdIsValid(xid))
+ if (!TransactionIdIsValid(current_xid))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- /* Add the new subxact to the array (unless already there). */
- subxact_info_add(xid);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
+ Assert(stream_fd);
+
+ /* Add the new subxact to the array (unless already there). */
+ subxact_info_add(current_xid);
- /* write the change to the current file */
- stream_write_change(action, s);
+ /* Write the change to the current file */
+ stream_write_change(action, s);
+ return true;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ /*
+ * XXX The publisher side doesn't always send relation/type update
+ * messages after the streaming transaction, so also update the
+ * relation/type in leader apply worker. See function
+ * cleanup_rel_sync_cache.
+ */
+ if (pa_send_data(winfo, s->len, s->data))
+ return (action != LOGICAL_REP_MSG_RELATION &&
+ action != LOGICAL_REP_MSG_TYPE);
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, false);
- return true;
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ stream_write_change(action, &original_msg);
+
+ /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
+ return (action != LOGICAL_REP_MSG_RELATION &&
+ action != LOGICAL_REP_MSG_TYPE);
+
+ case TRANS_PARALLEL_APPLY:
+ parallel_stream_nchanges += 1;
+
+ /* Define a savepoint for a subxact if needed. */
+ pa_start_subtrans(current_xid, stream_xid);
+ return false;
+
+ default:
+ Assert(false);
+ return false; /* silence compiler warning */
+ }
}
/*
@@ -928,8 +1091,11 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
* called within the PrepareTransactionBlock below.
*/
- BeginTransactionBlock();
- CommitTransactionCommand(); /* Completes the preceding Begin command. */
+ if (!IsTransactionBlock())
+ {
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+ }
/*
* Update origin state so we can restart streaming from correct position
@@ -976,7 +1142,7 @@ apply_handle_prepare(StringInfo s)
CommitTransactionCommand();
pgstat_report_stat(false);
- store_flush_position(prepare_data.end_lsn);
+ store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
@@ -998,6 +1164,12 @@ apply_handle_prepare(StringInfo s)
/*
* Handle a COMMIT PREPARED of a previously PREPARED transaction.
+ *
+ * Note that we don't need to wait here if the transaction was prepared in a
+ * parallel apply worker. In that case, we have already waited for the prepare
+ * to finish in apply_handle_stream_prepare() which will ensure all the
+ * operations in that transaction have happened in the subscriber, so no
+ * concurrent transaction can cause deadlock or transaction dependency issues.
*/
static void
apply_handle_commit_prepared(StringInfo s)
@@ -1027,7 +1199,7 @@ apply_handle_commit_prepared(StringInfo s)
CommitTransactionCommand();
pgstat_report_stat(false);
- store_flush_position(prepare_data.end_lsn);
+ store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
@@ -1041,6 +1213,12 @@ apply_handle_commit_prepared(StringInfo s)
/*
* Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
+ *
+ * Note that we don't need to wait here if the transaction was prepared in a
+ * parallel apply worker. In that case, we have already waited for the prepare
+ * to finish in apply_handle_stream_prepare() which will ensure all the
+ * operations in that transaction have happened in the subscriber, so no
+ * concurrent transaction can cause deadlock or transaction dependency issues.
*/
static void
apply_handle_rollback_prepared(StringInfo s)
@@ -1082,7 +1260,7 @@ apply_handle_rollback_prepared(StringInfo s)
pgstat_report_stat(false);
- store_flush_position(rollback_data.rollback_end_lsn);
+ store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
@@ -1094,15 +1272,16 @@ apply_handle_rollback_prepared(StringInfo s)
/*
* Handle STREAM PREPARE.
- *
- * Logic is in two parts:
- * 1. Replay all the spooled operations
- * 2. Mark the transaction as prepared
*/
static void
apply_handle_stream_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
if (in_streamed_transaction)
ereport(ERROR,
@@ -1118,24 +1297,98 @@ apply_handle_stream_prepare(StringInfo s)
logicalrep_read_stream_prepare(s, &prepare_data);
set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
- elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
+ apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
- /* Replay all the spooled operations. */
- apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
- /* Mark the transaction as prepared. */
- apply_handle_prepare_internal(&prepare_data);
+ /*
+ * The transaction has been serialized to file, so replay all the
+ * spooled operations.
+ */
+ apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
+ prepare_data.xid, prepare_data.prepare_lsn);
- CommitTransactionCommand();
+ /* Mark the transaction as prepared. */
+ apply_handle_prepare_internal(&prepare_data);
- pgstat_report_stat(false);
+ CommitTransactionCommand();
- store_flush_position(prepare_data.end_lsn);
+ store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
- in_remote_transaction = false;
+ in_remote_transaction = false;
+
+ /* Unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+
+ elog(DEBUG1, "finished processing the STREAM PREPARE command");
+ break;
- /* unlink the files with serialized changes and subxact info. */
- stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, prepare_data.end_lsn);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ stream_open_and_write_change(prepare_data.xid,
+ LOGICAL_REP_MSG_STREAM_PREPARE,
+ &original_msg);
+
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, prepare_data.end_lsn);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before preparing.
+ */
+ if (stream_fd)
+ stream_close_file();
+
+ begin_replication_step();
+
+ /* Mark the transaction as prepared. */
+ apply_handle_prepare_internal(&prepare_data);
+
+ end_replication_step();
+
+ CommitTransactionCommand();
+
+ MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
+ pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+
+ pa_reset_subtrans();
+
+ elog(DEBUG1, "finished processing the STREAM PREPARE command");
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
+
+ pgstat_report_stat(false);
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(prepare_data.end_lsn);
@@ -1173,27 +1426,64 @@ apply_handle_origin(StringInfo s)
}
/*
+ * Initialize fileset (if not already done).
+ *
+ * Create a new file when first_segment is true, otherwise open the existing
+ * file.
+ */
+void
+stream_start_internal(TransactionId xid, bool first_segment)
+{
+ begin_replication_step();
+
+ /*
+ * Initialize the worker's stream_fileset if we haven't yet. This will be
+ * used for the entire duration of the worker so create it in a permanent
+ * context. We create this on the very first streaming message from any
+ * transaction and then use it for this and other streaming transactions.
+ * Now, we could create a fileset at the start of the worker as well but
+ * then we won't be sure that it will ever be used.
+ */
+ if (!MyLogicalRepWorker->stream_fileset)
+ {
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
+ FileSetInit(MyLogicalRepWorker->stream_fileset);
+
+ MemoryContextSwitchTo(oldctx);
+ }
+
+ /* Open the spool file for this transaction. */
+ stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
+
+ /* If this is not the first segment, open existing subxact file. */
+ if (!first_segment)
+ subxact_info_read(MyLogicalRepWorker->subid, xid);
+
+ end_replication_step();
+}
+
+/*
* Handle STREAM START message.
*/
static void
apply_handle_stream_start(StringInfo s)
{
bool first_segment;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
if (in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("duplicate STREAM START message")));
- /*
- * Start a transaction on stream start, this transaction will be committed
- * on the stream stop unless it is a tablesync worker in which case it
- * will be committed after processing all the messages. We need the
- * transaction for handling the buffile, used for serializing the
- * streaming data and subxact info.
- */
- begin_replication_step();
-
/* notify handle methods we're processing a remote transaction */
in_streamed_transaction = true;
@@ -1207,54 +1497,119 @@ apply_handle_stream_start(StringInfo s)
set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
- /*
- * Initialize the worker's stream_fileset if we haven't yet. This will be
- * used for the entire duration of the worker so create it in a permanent
- * context. We create this on the very first streaming message from any
- * transaction and then use it for this and other streaming transactions.
- * Now, we could create a fileset at the start of the worker as well but
- * then we won't be sure that it will ever be used.
- */
- if (MyLogicalRepWorker->stream_fileset == NULL)
+ /* Try to allocate a worker for the streaming transaction. */
+ if (first_segment)
+ pa_allocate_worker(stream_xid);
+
+ apply_action = get_transaction_apply_action(stream_xid, &winfo);
+
+ switch (apply_action)
{
- MemoryContext oldctx;
+ case TRANS_LEADER_SERIALIZE:
- oldctx = MemoryContextSwitchTo(ApplyContext);
+ /*
+ * Function stream_start_internal starts a transaction. This
+ * transaction will be committed on the stream stop unless it is a
+ * tablesync worker in which case it will be committed after
+ * processing all the messages. We need this transaction for
+ * handling the BufFile, used for serializing the streaming data
+ * and subxact info.
+ */
+ stream_start_internal(stream_xid, first_segment);
+ break;
- MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
- FileSetInit(MyLogicalRepWorker->stream_fileset);
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
- MemoryContextSwitchTo(oldctx);
- }
+ /*
+ * Once we start serializing the changes, the parallel apply
+ * worker will wait for the leader to release the stream lock
+ * until the end of the transaction. So, we don't need to release
+ * the lock or increment the stream count in that case.
+ */
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /*
+ * Unlock the shared object lock so that the parallel apply
+ * worker can continue to receive changes.
+ */
+ if (!first_segment)
+ pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
- /* open the spool file for this transaction */
- stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+ /*
+ * Increment the number of streaming blocks waiting to be
+ * processed by parallel apply worker.
+ */
+ pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
- /* if this is not the first segment, open existing subxact file */
- if (!first_segment)
- subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+ /* Cache the parallel apply worker for this transaction. */
+ pa_set_stream_apply_worker(winfo);
+ break;
+ }
- pgstat_report_activity(STATE_RUNNING, NULL);
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, !first_segment);
- end_replication_step();
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ /*
+ * Open the spool file unless it was already opened when switching
+ * to serialize mode. The transaction started in
+ * stream_start_internal will be committed on the stream stop.
+ */
+ if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
+ stream_start_internal(stream_xid, first_segment);
+
+ stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
+
+ /* Cache the parallel apply worker for this transaction. */
+ pa_set_stream_apply_worker(winfo);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+ if (first_segment)
+ {
+ /* Hold the lock until the end of the transaction. */
+ pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
+
+ /*
+ * Signal the leader apply worker, as it may be waiting for
+ * us.
+ */
+ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ }
+
+ parallel_stream_nchanges = 0;
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
}
/*
- * Handle STREAM STOP message.
+ * Update the information about subxacts and close the file.
+ *
+ * This function should be called when the stream_start_internal function has
+ * been called.
*/
-static void
-apply_handle_stream_stop(StringInfo s)
+void
+stream_stop_internal(TransactionId xid)
{
- if (!in_streamed_transaction)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("STREAM STOP message without STREAM START")));
-
/*
- * Close the file with serialized changes, and serialize information about
- * subxacts for the toplevel transaction.
+ * Serialize information about subxacts for the toplevel transaction, then
+ * close the stream messages spool file.
*/
- subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
+ subxact_info_write(MyLogicalRepWorker->subid, xid);
stream_close_file();
/* We must be in a valid transaction state */
@@ -1263,40 +1618,124 @@ apply_handle_stream_stop(StringInfo s)
/* Commit the per-stream transaction */
CommitTransactionCommand();
- in_streamed_transaction = false;
-
/* Reset per-stream context */
MemoryContextReset(LogicalStreamingContext);
-
- pgstat_report_activity(STATE_IDLE, NULL);
- reset_apply_error_context_info();
}
/*
- * Handle STREAM abort message.
+ * Handle STREAM STOP message.
*/
static void
-apply_handle_stream_abort(StringInfo s)
+apply_handle_stream_stop(StringInfo s)
{
- TransactionId xid;
- TransactionId subxid;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
- if (in_streamed_transaction)
+ if (!in_streamed_transaction)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("STREAM ABORT message without STREAM STOP")));
+ errmsg_internal("STREAM STOP message without STREAM START")));
+
+ apply_action = get_transaction_apply_action(stream_xid, &winfo);
+
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
+ stream_stop_internal(stream_xid);
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ /*
+ * Lock before sending the STREAM_STOP message so that the leader
+ * can hold the lock first and the parallel apply worker will wait
+ * for leader to release the lock. See Locking Considerations atop
+ * applyparallelworker.c.
+ */
+ pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ pa_set_stream_apply_worker(NULL);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
+ stream_stop_internal(stream_xid);
+ pa_set_stream_apply_worker(NULL);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+ elog(DEBUG1, "applied %u changes in the streaming chunk",
+ parallel_stream_nchanges);
+
+ /*
+ * By the time parallel apply worker is processing the changes in
+ * the current streaming block, the leader apply worker may have
+ * sent multiple streaming blocks. This can lead to parallel apply
+ * worker start waiting even when there are more chunk of streams
+ * in the queue. So, try to lock only if there is no message left
+ * in the queue. See Locking Considerations atop
+ * applyparallelworker.c.
+ *
+ * Note that here we have a race condition where we can start
+ * waiting even when there are pending streaming chunks. This can
+ * happen if the leader sends another streaming block and acquires
+ * the stream lock again after the parallel apply worker checks
+ * that there is no pending streaming block and before it actually
+ * starts waiting on a lock. We can handle this case by not
+ * allowing the leader to increment the stream block count during
+ * the time parallel apply worker acquires the lock but it is not
+ * clear whether that is worth the complexity.
+ *
+ * Now, if this missed chunk contains rollback to savepoint, then
+ * there is a risk of deadlock which probably shouldn't happen
+ * after restart.
+ */
+ pa_decr_and_wait_stream_block();
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
+
+ in_streamed_transaction = false;
- logicalrep_read_stream_abort(s, &xid, &subxid);
+ /*
+ * The parallel apply worker could be in a transaction in which case we
+ * need to report the state as STATE_IDLEINTRANSACTION.
+ */
+ if (IsTransactionOrTransactionBlock())
+ pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
+ else
+ pgstat_report_activity(STATE_IDLE, NULL);
+ reset_apply_error_context_info();
+}
+
+/*
+ * Helper function to handle STREAM ABORT message when the transaction was
+ * serialized to file.
+ */
+static void
+stream_abort_internal(TransactionId xid, TransactionId subxid)
+{
/*
* If the two XIDs are the same, it's in fact abort of toplevel xact, so
* just delete the files with serialized info.
*/
if (xid == subxid)
- {
- set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
- }
else
{
/*
@@ -1320,8 +1759,6 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
-
subidx = -1;
begin_replication_step();
subxact_info_read(MyLogicalRepWorker->subid, xid);
@@ -1346,7 +1783,6 @@ apply_handle_stream_abort(StringInfo s)
cleanup_subxact_info();
end_replication_step();
CommitTransactionCommand();
- reset_apply_error_context_info();
return;
}
@@ -1369,24 +1805,215 @@ apply_handle_stream_abort(StringInfo s)
end_replication_step();
CommitTransactionCommand();
}
+}
+
+/*
+ * Handle STREAM ABORT message.
+ */
+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+ LogicalRepStreamAbortData abort_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
+ bool toplevel_xact;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM ABORT message without STREAM STOP")));
+
+ /* We receive abort information only when we can apply in parallel. */
+ logicalrep_read_stream_abort(s, &abort_data,
+ MyLogicalRepWorker->parallel_apply);
+
+ xid = abort_data.xid;
+ subxid = abort_data.subxid;
+ toplevel_xact = (xid == subxid);
+
+ set_apply_error_context_xact(subxid, abort_data.abort_lsn);
+
+ apply_action = get_transaction_apply_action(xid, &winfo);
+
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
+
+ /*
+ * We are in the leader apply worker and the transaction has been
+ * serialized to file.
+ */
+ stream_abort_internal(xid, subxid);
+
+ elog(DEBUG1, "finished processing the STREAM ABORT command");
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ /*
+ * For the case of aborting the subtransaction, we increment the
+ * number of streaming blocks and take the lock again before
+ * sending the STREAM_ABORT to ensure that the parallel apply
+ * worker will wait on the lock for the next set of changes after
+ * processing the STREAM_ABORT message if it is not already
+ * waiting for STREAM_STOP message.
+ *
+ * It is important to perform this locking before sending the
+ * STREAM_ABORT message so that the leader can hold the lock first
+ * and the parallel apply worker will wait for the leader to
+ * release the lock. This is the same as what we do in
+ * apply_handle_stream_stop. See Locking Considerations atop
+ * applyparallelworker.c.
+ */
+ if (!toplevel_xact)
+ {
+ pa_unlock_stream(xid, AccessExclusiveLock);
+ pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
+ pa_lock_stream(xid, AccessExclusiveLock);
+ }
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /*
+ * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
+ * wait here for the parallel apply worker to finish as that
+ * is not required to maintain the commit order and won't have
+ * the risk of failures due to transaction dependencies and
+ * deadlocks. However, it is possible that before the parallel
+ * worker finishes and we clear the worker info, the xid
+ * wraparound happens on the upstream and a new transaction
+ * with the same xid can appear and that can lead to duplicate
+ * entries in ParallelApplyTxnHash. Yet another problem could
+ * be that we may have serialized the changes in partial
+ * serialize mode and the file containing xact changes may
+ * already exist, and after xid wraparound trying to create
+ * the file for the same xid can lead to an error. To avoid
+ * these problems, we decide to wait for the aborts to finish.
+ *
+ * Note, it is okay to not update the flush location position
+ * for aborts as in worst case that means such a transaction
+ * won't be sent again after restart.
+ */
+ if (toplevel_xact)
+ pa_xact_finish(winfo, InvalidXLogRecPtr);
+
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ /*
+ * Parallel apply worker might have applied some changes, so write
+ * the STREAM_ABORT message so that it can rollback the
+ * subtransaction if needed.
+ */
+ stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
+ &original_msg);
+
+ if (toplevel_xact)
+ {
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+ pa_xact_finish(winfo, InvalidXLogRecPtr);
+ }
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before aborting.
+ */
+ if (toplevel_xact && stream_fd)
+ stream_close_file();
+
+ pa_stream_abort(&abort_data);
+
+ /*
+ * We need to wait after processing rollback to savepoint for the
+ * next set of changes.
+ *
+ * We have a race condition here due to which we can start waiting
+ * here when there are more chunk of streams in the queue. See
+ * apply_handle_stream_stop.
+ */
+ if (!toplevel_xact)
+ pa_decr_and_wait_stream_block();
+
+ elog(DEBUG1, "finished processing the STREAM ABORT command");
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
reset_apply_error_context_info();
}
/*
- * Common spoolfile processing.
+ * Ensure that the passed location is fileset's end.
*/
static void
-apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
+ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
+ off_t offset)
+{
+ char path[MAXPGPATH];
+ BufFile *fd;
+ int last_fileno;
+ off_t last_offset;
+
+ Assert(!IsTransactionState());
+
+ begin_replication_step();
+
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+
+ fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+
+ BufFileSeek(fd, 0, 0, SEEK_END);
+ BufFileTell(fd, &last_fileno, &last_offset);
+
+ BufFileClose(fd);
+
+ end_replication_step();
+
+ if (last_fileno != fileno || last_offset != offset)
+ elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
+ path);
+}
+
+/*
+ * Common spoolfile processing.
+ */
+void
+apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
+ XLogRecPtr lsn)
{
StringInfoData s2;
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
MemoryContext oldcxt;
- BufFile *fd;
+ ResourceOwner oldowner;
+ int fileno;
+ off_t offset;
- maybe_start_skipping_changes(lsn);
+ if (!am_parallel_apply_worker())
+ maybe_start_skipping_changes(lsn);
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1402,8 +2029,16 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
- fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
- false);
+ /*
+ * Make sure the file is owned by the toplevel transaction so that the
+ * file will not be accidentally closed when aborting a subtransaction.
+ */
+ oldowner = CurrentResourceOwner;
+ CurrentResourceOwner = TopTransactionResourceOwner;
+
+ stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
+
+ CurrentResourceOwner = oldowner;
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
@@ -1434,7 +2069,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
CHECK_FOR_INTERRUPTS();
/* read length of the on-disk record */
- nbytes = BufFileRead(fd, &len, sizeof(len));
+ nbytes = BufFileRead(stream_fd, &len, sizeof(len));
/* have we reached end of the file? */
if (nbytes == 0)
@@ -1455,12 +2090,14 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
buffer = repalloc(buffer, len);
/* and finally read the data into the buffer */
- if (BufFileRead(fd, buffer, len) != len)
+ if (BufFileRead(stream_fd, buffer, len) != len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
path)));
+ BufFileTell(stream_fd, &fileno, &offset);
+
/* copy the buffer to the stringinfo and call apply_dispatch */
resetStringInfo(&s2);
appendBinaryStringInfo(&s2, buffer, len);
@@ -1476,15 +2113,24 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
nchanges++;
+ /*
+ * It is possible the file has been closed because we have processed
+ * the transaction end message like stream_commit in which case that
+ * must be the last message.
+ */
+ if (!stream_fd)
+ {
+ ensure_last_message(stream_fileset, xid, fileno, offset);
+ break;
+ }
+
if (nchanges % 1000 == 0)
elog(DEBUG1, "replayed %d changes from file \"%s\"",
nchanges, path);
}
- BufFileClose(fd);
-
- pfree(buffer);
- pfree(s2.data);
+ if (stream_fd)
+ stream_close_file();
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
@@ -1500,6 +2146,11 @@ apply_handle_stream_commit(StringInfo s)
{
TransactionId xid;
LogicalRepCommitData commit_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
if (in_streamed_transaction)
ereport(ERROR,
@@ -1509,14 +2160,85 @@ apply_handle_stream_commit(StringInfo s)
xid = logicalrep_read_stream_commit(s, &commit_data);
set_apply_error_context_xact(xid, commit_data.commit_lsn);
- elog(DEBUG1, "received commit for streamed transaction %u", xid);
+ apply_action = get_transaction_apply_action(xid, &winfo);
- apply_spooled_messages(xid, commit_data.commit_lsn);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_SERIALIZE:
- apply_handle_commit_internal(&commit_data);
+ /*
+ * The transaction has been serialized to file, so replay all the
+ * spooled operations.
+ */
+ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
+ commit_data.commit_lsn);
+
+ apply_handle_commit_internal(&commit_data);
+
+ /* Unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+ elog(DEBUG1, "finished processing the STREAM COMMIT command");
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, commit_data.end_lsn);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
+ &original_msg);
+
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+ /* Finish processing the streaming transaction. */
+ pa_xact_finish(winfo, commit_data.end_lsn);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before committing.
+ */
+ if (stream_fd)
+ stream_close_file();
+
+ apply_handle_commit_internal(&commit_data);
+
+ MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+ /*
+ * It is important to set the transaction state as finished before
+ * releasing the lock. See pa_wait_for_xact_finish.
+ */
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
+ pa_unlock_transaction(xid, AccessExclusiveLock);
- /* unlink the files with serialized changes and subxact info */
- stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ pa_reset_subtrans();
+
+ elog(DEBUG1, "finished processing the STREAM COMMIT command");
+ break;
+
+ default:
+ Assert(false);
+ break;
+ }
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
@@ -1560,9 +2282,16 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
replorigin_session_origin_timestamp = commit_data->committime;
CommitTransactionCommand();
+
+ if (IsTransactionBlock())
+ {
+ EndTransactionBlock(false);
+ CommitTransactionCommand();
+ }
+
pgstat_report_stat(false);
- store_flush_position(commit_data->end_lsn);
+ store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
}
else
{
@@ -2508,7 +3237,7 @@ apply_handle_truncate(StringInfo s)
/*
* Logical replication protocol message dispatcher.
*/
-static void
+void
apply_dispatch(StringInfo s)
{
LogicalRepMsgType action = pq_getmsgbyte(s);
@@ -2672,17 +3401,24 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
/*
* Store current remote/local lsn pair in the tracking list.
*/
-static void
-store_flush_position(XLogRecPtr remote_lsn)
+void
+store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
{
FlushPosition *flushpos;
+ /*
+ * Skip for parallel apply workers, because the lsn_mapping is maintained
+ * by the leader apply worker.
+ */
+ if (am_parallel_apply_worker())
+ return;
+
/* Need to do this in permanent context */
MemoryContextSwitchTo(ApplyContext);
/* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
- flushpos->local_end = XactLastCommitEnd;
+ flushpos->local_end = local_lsn;
flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node);
@@ -2741,6 +3477,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
errcallback.callback = apply_error_callback;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
+ apply_error_context_stack = error_context_stack;
/* This outer loop iterates once per wait. */
for (;;)
@@ -2955,6 +3692,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* Pop the error context stack */
error_context_stack = errcallback.previous;
+ apply_error_context_stack = error_context_stack;
/* All done */
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
@@ -3053,9 +3791,31 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
}
/*
- * Reread subscription info if needed. Most changes will be exit.
+ * Exit routine for apply workers due to subscription parameter changes.
*/
static void
+apply_worker_exit(void)
+{
+ if (am_parallel_apply_worker())
+ {
+ /*
+ * Don't stop the parallel apply worker as the leader will detect the
+ * subscription parameter change and restart logical replication later
+ * anyway. This also prevents the leader from reporting errors when
+ * trying to communicate with a stopped parallel apply worker, which
+ * would accidentally disable subscriptions if disable_on_error was
+ * set.
+ */
+ return;
+ }
+
+ proc_exit(0);
+}
+
+/*
+ * Reread subscription info if needed. Most changes will be exit.
+ */
+void
maybe_reread_subscription(void)
{
MemoryContext oldctx;
@@ -3085,9 +3845,9 @@ maybe_reread_subscription(void)
if (!newsub)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "stop because the subscription was removed",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
+ get_worker_name(), MySubscription->name)));
proc_exit(0);
}
@@ -3096,11 +3856,11 @@ maybe_reread_subscription(void)
if (!newsub->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will "
- "stop because the subscription was disabled",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will stop because the subscription was disabled",
+ get_worker_name(), MySubscription->name)));
- proc_exit(0);
+ apply_worker_exit();
}
/* !slotname should never happen when enabled is true. */
@@ -3111,7 +3871,10 @@ maybe_reread_subscription(void)
/*
* Exit if any parameter that affects the remote connection was changed.
- * The launcher will start a new worker.
+ * The launcher will start a new worker but note that the parallel apply
+ * worker won't restart if the streaming option's value is changed from
+ * 'parallel' to any other value or the server decides not to stream the
+ * in-progress transaction.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
strcmp(newsub->name, MySubscription->name) != 0 ||
@@ -3122,11 +3885,17 @@ maybe_reread_subscription(void)
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
- ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
- MySubscription->name)));
+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will restart because of a parameter change",
+ get_worker_name(), MySubscription->name)));
- proc_exit(0);
+ apply_worker_exit();
}
/* Check for other changes that should never happen too. */
@@ -3378,7 +4147,7 @@ changes_filename(char *path, Oid subid, TransactionId xid)
* toplevel transaction. Each subscription has a separate set of files
* for any toplevel transaction.
*/
-static void
+void
stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
@@ -3401,9 +4170,6 @@ stream_cleanup_files(Oid subid, TransactionId xid)
* by stream_xid (global variable). If it's the first chunk of streamed
* changes for this transaction, create the buffile, otherwise open the
* previously created file.
- *
- * This can only be called at the beginning of a "streaming" block, i.e.
- * between stream_start/stream_stop messages from the upstream.
*/
static void
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
@@ -3411,7 +4177,6 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
char path[MAXPGPATH];
MemoryContext oldcxt;
- Assert(in_streamed_transaction);
Assert(OidIsValid(subid));
Assert(TransactionIdIsValid(xid));
Assert(stream_fd == NULL);
@@ -3450,15 +4215,10 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
/*
* stream_close_file
* Close the currently open file with streamed changes.
- *
- * This can only be called at the end of a streaming block, i.e. at stream_stop
- * message from the upstream.
*/
static void
stream_close_file(void)
{
- Assert(in_streamed_transaction);
- Assert(TransactionIdIsValid(stream_xid));
Assert(stream_fd != NULL);
BufFileClose(stream_fd);
@@ -3480,8 +4240,6 @@ stream_write_change(char action, StringInfo s)
{
int len;
- Assert(in_streamed_transaction);
- Assert(TransactionIdIsValid(stream_xid));
Assert(stream_fd != NULL);
/* total on-disk size, including the action type character */
@@ -3500,6 +4258,26 @@ stream_write_change(char action, StringInfo s)
}
/*
+ * stream_open_and_write_change
+ * Serialize a message to a file for the given transaction.
+ *
+ * This function is similar to stream_write_change except that it will open the
+ * target file if not already before writing the message and close the file at
+ * the end.
+ */
+static void
+stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
+{
+ Assert(!in_streamed_transaction);
+
+ if (!stream_fd)
+ stream_start_internal(xid, false);
+
+ stream_write_change(action, s);
+ stream_stop_internal(xid);
+}
+
+/*
* Cleanup the memory for subxacts and reset the related variables.
*/
static inline void
@@ -3610,37 +4388,16 @@ start_apply(XLogRecPtr origin_startpos)
PG_END_TRY();
}
-/* Logical Replication Apply worker entry point */
+/*
+ * Common initialization for leader apply worker and parallel apply worker.
+ *
+ * Initialize the database connection, in-memory subscription and necessary
+ * config options.
+ */
void
-ApplyWorkerMain(Datum main_arg)
+InitializeApplyWorker(void)
{
- int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos = InvalidXLogRecPtr;
- char *myslotname = NULL;
- WalRcvStreamOptions options;
- int server_version;
-
- /* Attach to slot */
- logicalrep_worker_attach(worker_slot);
-
- /* Setup signal handling */
- pqsignal(SIGHUP, SignalHandlerForConfigReload);
- pqsignal(SIGTERM, die);
- BackgroundWorkerUnblockSignals();
-
- /*
- * We don't currently need any ResourceOwner in a walreceiver process, but
- * if we did, we could call CreateAuxProcessResourceOwner here.
- */
-
- /* Initialise stats to a sanish value */
- MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
- MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
-
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
/* Run as replica session replication role. */
SetConfigOption("session_replication_role", "replica",
@@ -3668,9 +4425,10 @@ ApplyWorkerMain(Datum main_arg)
if (!MySubscription)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription %u will not "
- "start because the subscription was removed during startup",
- MyLogicalRepWorker->subid)));
+ /* translator: %s is the name of logical replication worker */
+ (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
+ get_worker_name(), MyLogicalRepWorker->subid)));
+
proc_exit(0);
}
@@ -3680,11 +4438,11 @@ ApplyWorkerMain(Datum main_arg)
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup",
+ get_worker_name(), MySubscription->name)));
- proc_exit(0);
+ apply_worker_exit();
}
/* Setup synchronous commit according to the user's wishes */
@@ -3699,13 +4457,49 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
- MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));
else
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" has started",
- MySubscription->name)));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" has started",
+ get_worker_name(), MySubscription->name)));
CommitTransactionCommand();
+}
+
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *myslotname = NULL;
+ WalRcvStreamOptions options;
+ int server_version;
+
+ /* Attach to slot */
+ logicalrep_worker_attach(worker_slot);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGTERM, die);
+ BackgroundWorkerUnblockSignals();
+
+ /*
+ * We don't currently need any ResourceOwner in a walreceiver process, but
+ * if we did, we could call CreateAuxProcessResourceOwner here.
+ */
+
+ /* Initialise stats to a sanish value */
+ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+ MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ InitializeApplyWorker();
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
@@ -3715,20 +4509,15 @@ ApplyWorkerMain(Datum main_arg)
{
start_table_sync(&origin_startpos, &myslotname);
- /*
- * Allocate the origin name in long-lived context for error context
- * message.
- */
ReplicationOriginNameForLogicalRep(MySubscription->oid,
MyLogicalRepWorker->relid,
originname,
sizeof(originname));
- apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
- originname);
+ set_apply_error_context_origin(originname);
}
else
{
- /* This is main apply worker */
+ /* This is the leader apply worker */
RepOriginId originid;
TimeLineID startpointTLI;
char *err;
@@ -3752,7 +4541,7 @@ ApplyWorkerMain(Datum main_arg)
originid = replorigin_by_name(originname, true);
if (!OidIsValid(originid))
originid = replorigin_create(originname);
- replorigin_session_setup(originid);
+ replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
@@ -3770,12 +4559,7 @@ ApplyWorkerMain(Datum main_arg)
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
- /*
- * Allocate the origin name in long-lived context for error context
- * message.
- */
- apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
- originname);
+ set_apply_error_context_origin(originname);
}
/*
@@ -3793,13 +4577,36 @@ ApplyWorkerMain(Datum main_arg)
server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
options.proto.logical.proto_version =
+ server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
- options.proto.logical.streaming = MySubscription->stream;
+
+ /*
+ * Assign the appropriate option value for streaming option according to
+ * the 'streaming' mode and the publisher's ability to support that mode.
+ */
+ if (server_version >= 160000 &&
+ MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
+ {
+ options.proto.logical.streaming_str = "parallel";
+ MyLogicalRepWorker->parallel_apply = true;
+ }
+ else if (server_version >= 140000 &&
+ MySubscription->stream != LOGICALREP_STREAM_OFF)
+ {
+ options.proto.logical.streaming_str = "on";
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+ else
+ {
+ options.proto.logical.streaming_str = NULL;
+ MyLogicalRepWorker->parallel_apply = false;
+ }
+
options.proto.logical.twophase = false;
options.proto.logical.origin = pstrdup(MySubscription->origin);
@@ -3897,6 +4704,15 @@ IsLogicalWorker(void)
}
/*
+ * Is current process a logical replication parallel apply worker?
+ */
+bool
+IsLogicalParallelApplyWorker(void)
+{
+ return IsLogicalWorker() && am_parallel_apply_worker();
+}
+
+/*
* Start skipping changes of the transaction if the given LSN matches the
* LSN specified by subscription's skiplsn.
*/
@@ -3958,7 +4774,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
XLogRecPtr myskiplsn = MySubscription->skiplsn;
bool started_tx = false;
- if (likely(XLogRecPtrIsInvalid(myskiplsn)))
+ if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
return;
if (!IsTransactionState())
@@ -4030,7 +4846,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
}
/* Error callback to give more context info about the change being applied */
-static void
+void
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
@@ -4058,23 +4874,47 @@ apply_error_callback(void *arg)
errarg->remote_xid,
LSN_FORMAT_ARGS(errarg->finish_lsn));
}
- else if (errarg->remote_attnum < 0)
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
- errarg->origin_name,
- logicalrep_message_type(errarg->command),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname,
- errarg->remote_xid,
- LSN_FORMAT_ARGS(errarg->finish_lsn));
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
- errarg->origin_name,
- logicalrep_message_type(errarg->command),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname,
- errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid,
- LSN_FORMAT_ARGS(errarg->finish_lsn));
+ {
+ if (errarg->remote_attnum < 0)
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ else
+ {
+ if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
+ }
+ }
}
/* Set transaction information of apply error callback */
@@ -4144,3 +4984,53 @@ AtEOXact_LogicalRepWorkers(bool isCommit)
/* The List storage will be reclaimed automatically in xact cleanup. */
on_commit_wakeup_workers_subids = NIL;
}
+
+/*
+ * Allocate the origin name in long-lived context for error context message.
+ */
+void
+set_apply_error_context_origin(char *originname)
+{
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
+}
+
+/*
+ * Return the action to be taken for the given transaction. *winfo is
+ * assigned to the destination parallel worker info when the leader apply
+ * worker has to pass all the transaction's changes to the parallel apply
+ * worker.
+ */
+static TransApplyAction
+get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
+{
+ *winfo = NULL;
+
+ if (am_parallel_apply_worker())
+ {
+ return TRANS_PARALLEL_APPLY;
+ }
+ else if (in_remote_transaction)
+ {
+ return TRANS_LEADER_APPLY;
+ }
+
+ /*
+ * Check if we are processing this transaction using a parallel apply
+ * worker.
+ */
+ *winfo = pa_find_worker(xid);
+
+ if (!*winfo)
+ {
+ return TRANS_LEADER_SERIALIZE;
+ }
+ else if ((*winfo)->serialize_changes)
+ {
+ return TRANS_LEADER_PARTIAL_SERIALIZE;
+ }
+ else
+ {
+ return TRANS_LEADER_SEND_TO_PARALLEL;
+ }
+}