diff options
author | Amit Kapila <akapila@postgresql.org> | 2020-07-20 08:48:26 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2020-07-20 08:48:26 +0530 |
commit | 0bead9af484c1d0a67e690fda47011addaa5bc9d (patch) | |
tree | 246db2163552a439ac1b7903fd36cb02e73cea3c /src/backend/access | |
parent | d05b172a760e0ccb3008a2144f96053720000b12 (diff) | |
download | postgresql-0bead9af484c1d0a67e690fda47011addaa5bc9d.tar.gz postgresql-0bead9af484c1d0a67e690fda47011addaa5bc9d.zip |
Immediately WAL-log subtransaction and top-level XID association.
The logical decoding infrastructure needs to know which top-level
transaction the subxact belongs to, in order to decode all the
changes. Until now that might be delayed until commit, due to the
caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring
incremental decoding.
So we also write the assignment info into WAL immediately, as part
of the next WAL record (to minimize overhead) only when wal_level=logical.
We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is
required for avoiding overflow in the hot standby snapshot.
Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID.
Author: Tomas Vondra, Dilip Kumar, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/xact.c | 50 | ||||
-rw-r--r-- | src/backend/access/transam/xloginsert.c | 23 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 5 |
3 files changed, 76 insertions, 2 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b3ee7fa7ea0..bd4c3cf3258 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -191,6 +191,7 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ + bool assigned; /* assigned to top-level XID */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -223,6 +224,7 @@ typedef struct SerializedTransactionState static TransactionStateData TopTransactionStateData = { .state = TRANS_DEFAULT, .blockState = TBLOCK_DEFAULT, + .assigned = false, }; /* @@ -5120,6 +5122,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; + s->assigned = false; CurrentTransactionState = s; @@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * IsSubTransactionAssignmentPending + * + * This is used to decide whether we need to WAL log the top-level XID for + * operation in a subtransaction. We require that for logical decoding, see + * LogicalDecodingProcessRecord. + * + * This returns true if wal_level >= logical and we are inside a valid + * subtransaction, for which the assignment was not yet written to any WAL + * record. + */ +bool +IsSubTransactionAssignmentPending(void) +{ + /* wal_level has to be logical */ + if (!XLogLogicalInfoActive()) + return false; + + /* we need to be in a transaction state */ + if (!IsTransactionState()) + return false; + + /* it has to be a subtransaction */ + if (!IsSubTransaction()) + return false; + + /* the subtransaction has to have a XID assigned */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return false; + + /* and it should not be already 'assigned' */ + return !CurrentTransactionState->assigned; +} + +/* + * MarkSubTransactionAssigned + * + * Mark the subtransaction assignment as completed. + */ +void +MarkSubTransactionAssigned(void) +{ + Assert(IsSubTransactionAssignmentPending()); + + CurrentTransactionState->assigned = true; +} diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index b21679f09eb..c526bb19281 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -89,11 +89,13 @@ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfXLogTransactionId) /* * An array of XLogRecData structs, to hold registered data. @@ -195,6 +197,10 @@ XLogResetInsertion(void) { int i; + /* reset the subxact assignment flag (if needed) */ + if (curinsert_flags & XLOG_INCLUDE_XID) + MarkSubTransactionAssigned(); + for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; @@ -398,7 +404,7 @@ void XLogSetRecordFlags(uint8 flags) { Assert(begininsert_called); - curinsert_flags = flags; + curinsert_flags |= flags; } /* @@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(replorigin_session_origin); } + /* followed by toplevel XID, if not already included in previous record */ + if (IsSubTransactionAssignmentPending()) + { + TransactionId xid = GetTopTransactionIdIfAny(); + + /* update the flag (later used by XLogResetInsertion) */ + XLogSetRecordFlags(XLOG_INCLUDE_XID); + + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + /* followed by main data, if any */ if (mainrdata_len > 0) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index cb76be4f469..a757baccfc5 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) state->decoded_record = record; state->record_origin = InvalidRepOriginId; + state->toplevel_xid = InvalidTransactionId; ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) { COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); } + else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) + { + COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + } else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ |