aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/worker.c73
1 files changed, 47 insertions, 26 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d8b8a374c62..a0084c7ef69 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg
* The action to be taken for the changes in the transaction.
*
* TRANS_LEADER_APPLY:
- * This action means that we are in the leader apply worker and changes of the
- * transaction are applied directly by the worker.
+ * This action means that we are in the leader apply worker or table sync
+ * worker. The changes of the transaction are either directly applied or
+ * are read from temporary files (for streaming transactions) and then
+ * applied by the worker.
*
* TRANS_LEADER_SERIALIZE:
* This action means that we are in the leader apply worker or table sync
@@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s)
{
LogicalRepBeginData begin_data;
+ /* There must not be an active streaming transaction. */
+ Assert(!TransactionIdIsValid(stream_xid));
+
logicalrep_read_begin(s, &begin_data);
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
@@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+ /* There must not be an active streaming transaction. */
+ Assert(!TransactionIdIsValid(stream_xid));
+
logicalrep_read_begin_prepare(s, &begin_data);
set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
@@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
switch (apply_action)
{
- case TRANS_LEADER_SERIALIZE:
+ case TRANS_LEADER_APPLY:
/*
* The transaction has been serialized to file, so replay all the
@@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s)
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
@@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("duplicate STREAM START message")));
+ /* There must not be an active streaming transaction. */
+ Assert(!TransactionIdIsValid(stream_xid));
+
/* notify handle methods we're processing a remote transaction */
in_streamed_transaction = true;
@@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s)
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
@@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s)
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
in_streamed_transaction = false;
+ stream_xid = InvalidTransactionId;
/*
* The parallel apply worker could be in a transaction in which case we
@@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s)
switch (apply_action)
{
- case TRANS_LEADER_SERIALIZE:
+ case TRANS_LEADER_APPLY:
/*
* We are in the leader apply worker and the transaction has been
@@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s)
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
@@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s)
switch (apply_action)
{
- case TRANS_LEADER_SERIALIZE:
+ case TRANS_LEADER_APPLY:
/*
* The transaction has been serialized to file, so replay all the
@@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s)
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
@@ -4204,7 +4216,6 @@ stream_close_file(void)
BufFileClose(stream_fd);
- stream_xid = InvalidTransactionId;
stream_fd = NULL;
}
@@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname)
}
/*
- * Return the action to be taken for the given transaction. *winfo is
- * assigned to the destination parallel worker info when the leader apply
- * worker has to pass all the transaction's changes to the parallel apply
- * worker.
+ * Return the action to be taken for the given transaction. See
+ * TransApplyAction for information on each of the actions.
+ *
+ * *winfo is assigned to the destination parallel worker info when the leader
+ * apply worker has to pass all the transaction's changes to the parallel
+ * apply worker.
*/
static TransApplyAction
get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
@@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
{
return TRANS_PARALLEL_APPLY;
}
- else if (in_remote_transaction)
- {
- return TRANS_LEADER_APPLY;
- }
/*
- * Check if we are processing this transaction using a parallel apply
- * worker.
+ * If we are processing this transaction using a parallel apply worker then
+ * either we send the changes to the parallel worker or if the worker is busy
+ * then serialize the changes to the file which will later be processed by
+ * the parallel worker.
*/
*winfo = pa_find_worker(xid);
- if (!*winfo)
+ if (*winfo && (*winfo)->serialize_changes)
{
- return TRANS_LEADER_SERIALIZE;
+ return TRANS_LEADER_PARTIAL_SERIALIZE;
}
- else if ((*winfo)->serialize_changes)
+ else if (*winfo)
{
- return TRANS_LEADER_PARTIAL_SERIALIZE;
+ return TRANS_LEADER_SEND_TO_PARALLEL;
+ }
+
+ /*
+ * If there is no parallel worker involved to process this transaction then
+ * we either directly apply the change or serialize it to a file which will
+ * later be applied when the transaction finish message is processed.
+ */
+ else if (in_streamed_transaction)
+ {
+ return TRANS_LEADER_SERIALIZE;
}
else
{
- return TRANS_LEADER_SEND_TO_PARALLEL;
+ return TRANS_LEADER_APPLY;
}
}