diff options
-rw-r--r-- | src/backend/replication/logical/worker.c | 73 |
1 files changed, 47 insertions, 26 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d8b8a374c62..a0084c7ef69 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg * The action to be taken for the changes in the transaction. * * TRANS_LEADER_APPLY: - * This action means that we are in the leader apply worker and changes of the - * transaction are applied directly by the worker. + * This action means that we are in the leader apply worker or table sync + * worker. The changes of the transaction are either directly applied or + * are read from temporary files (for streaming transactions) and then + * applied by the worker. * * TRANS_LEADER_SERIALIZE: * This action means that we are in the leader apply worker or table sync @@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s) { LogicalRepBeginData begin_data; + /* There must not be an active streaming transaction. */ + Assert(!TransactionIdIsValid(stream_xid)); + logicalrep_read_begin(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); @@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); + /* There must not be an active streaming transaction. */ + Assert(!TransactionIdIsValid(stream_xid)); + logicalrep_read_begin_prepare(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); @@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * The transaction has been serialized to file, so replay all the @@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("duplicate STREAM START message"))); + /* There must not be an active streaming transaction. */ + Assert(!TransactionIdIsValid(stream_xid)); + /* notify handle methods we're processing a remote transaction */ in_streamed_transaction = true; @@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } in_streamed_transaction = false; + stream_xid = InvalidTransactionId; /* * The parallel apply worker could be in a transaction in which case we @@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * We are in the leader apply worker and the transaction has been @@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s) switch (apply_action) { - case TRANS_LEADER_SERIALIZE: + case TRANS_LEADER_APPLY: /* * The transaction has been serialized to file, so replay all the @@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s) break; default: - Assert(false); + elog(ERROR, "unexpected apply action: %d", (int) apply_action); break; } @@ -4204,7 +4216,6 @@ stream_close_file(void) BufFileClose(stream_fd); - stream_xid = InvalidTransactionId; stream_fd = NULL; } @@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname) } /* - * Return the action to be taken for the given transaction. *winfo is - * assigned to the destination parallel worker info when the leader apply - * worker has to pass all the transaction's changes to the parallel apply - * worker. + * Return the action to be taken for the given transaction. See + * TransApplyAction for information on each of the actions. + * + * *winfo is assigned to the destination parallel worker info when the leader + * apply worker has to pass all the transaction's changes to the parallel + * apply worker. */ static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) @@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo) { return TRANS_PARALLEL_APPLY; } - else if (in_remote_transaction) - { - return TRANS_LEADER_APPLY; - } /* - * Check if we are processing this transaction using a parallel apply - * worker. + * If we are processing this transaction using a parallel apply worker then + * either we send the changes to the parallel worker or if the worker is busy + * then serialize the changes to the file which will later be processed by + * the parallel worker. */ *winfo = pa_find_worker(xid); - if (!*winfo) + if (*winfo && (*winfo)->serialize_changes) { - return TRANS_LEADER_SERIALIZE; + return TRANS_LEADER_PARTIAL_SERIALIZE; } - else if ((*winfo)->serialize_changes) + else if (*winfo) { - return TRANS_LEADER_PARTIAL_SERIALIZE; + return TRANS_LEADER_SEND_TO_PARALLEL; + } + + /* + * If there is no parallel worker involved to process this transaction then + * we either directly apply the change or serialize it to a file which will + * later be applied when the transaction finish message is processed. + */ + else if (in_streamed_transaction) + { + return TRANS_LEADER_SERIALIZE; } else { - return TRANS_LEADER_SEND_TO_PARALLEL; + return TRANS_LEADER_APPLY; } } |