diff options
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 18 | ||||
-rw-r--r-- | src/backend/replication/logical/decode.c | 28 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 283 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 39 |
4 files changed, 186 insertions, 182 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index ea463fb2e7c..e356c7ca675 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -358,43 +358,45 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: appendStringInfoString(ctx->out, " INSERT:"); - if (change->tp.newtuple == NULL) + if (change->data.tp.newtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); else tuple_to_stringinfo(ctx->out, tupdesc, - &change->tp.newtuple->tuple, + &change->data.tp.newtuple->tuple, false); break; case REORDER_BUFFER_CHANGE_UPDATE: appendStringInfoString(ctx->out, " UPDATE:"); - if (change->tp.oldtuple != NULL) + if (change->data.tp.oldtuple != NULL) { appendStringInfoString(ctx->out, " old-key:"); tuple_to_stringinfo(ctx->out, tupdesc, - &change->tp.oldtuple->tuple, + &change->data.tp.oldtuple->tuple, true); appendStringInfoString(ctx->out, " new-tuple:"); } - if (change->tp.newtuple == NULL) + if (change->data.tp.newtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); else tuple_to_stringinfo(ctx->out, tupdesc, - &change->tp.newtuple->tuple, + &change->data.tp.newtuple->tuple, false); break; case REORDER_BUFFER_CHANGE_DELETE: appendStringInfoString(ctx->out, " DELETE:"); /* if there was no PK, we only know that a delete happened */ - if (change->tp.oldtuple == NULL) + if (change->data.tp.oldtuple == NULL) appendStringInfoString(ctx->out, " (no-tuple-data)"); /* In DELETE, only the replica identity is present; display that */ else tuple_to_stringinfo(ctx->out, tupdesc, - &change->tp.oldtuple->tuple, + &change->data.tp.oldtuple->tuple, true); break; + default: + Assert(false); } MemoryContextSwitchTo(old); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index af3948e8a4a..414cfa95586 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -586,17 +586,17 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; - memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader)); - change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert, r->xl_len - SizeOfHeapInsert, - change->tp.newtuple); + change->data.tp.newtuple); } ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); @@ -626,7 +626,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; - memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); data = (char *) &xlhdr->header; @@ -634,11 +634,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen)); - change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); DecodeXLogTuple(data, xlhdr->t_len + SizeOfHeapHeader, - change->tp.newtuple); + change->data.tp.newtuple); /* skip over the rest of the tuple header */ data += SizeOfHeapHeader; /* skip over the tuple data */ @@ -648,10 +648,10 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { xlhdr = (xl_heap_header_len *) data; - change->tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); DecodeXLogTuple((char *) &xlhdr->header, xlhdr->t_len + SizeOfHeapHeader, - change->tp.oldtuple); + change->data.tp.oldtuple); data = (char *) &xlhdr->header; data += SizeOfHeapHeader; data += xlhdr->t_len; @@ -681,18 +681,18 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; - memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); /* old primary key stored */ if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, r->xl_len - SizeOfHeapDelete, - change->tp.oldtuple); + change->data.tp.oldtuple); } ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); } @@ -735,7 +735,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; - memcpy(&change->tp.relnode, &xlrec->node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode)); /* * CONTAINS_NEW_TUPLE will always be set currently as multi_insert @@ -746,9 +746,9 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { - change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); - tuple = change->tp.newtuple; + tuple = change->data.tp.newtuple; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ad560345b40..4493930eda0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -80,26 +80,6 @@ #include "utils/relfilenodemap.h" #include "utils/tqual.h" -/* - * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds - * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE - * changes. We don't want to leak those internal values to external users - * though (they would just use switch()...default:) because that would make it - * harder to add to new user visible values. - * - * This needs to be synchronized with ReorderBufferChangeType! Adjust the - * StaticAssertExpr's in ReorderBufferAllocate if you add anything! - */ -typedef enum -{ - REORDER_BUFFER_CHANGE_INTERNAL_INSERT, - REORDER_BUFFER_CHANGE_INTERNAL_UPDATE, - REORDER_BUFFER_CHANGE_INTERNAL_DELETE, - REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, - REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, - REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID -} ReorderBufferChangeTypeInternal; - /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt { @@ -255,10 +235,6 @@ ReorderBufferAllocate(void) HASHCTL hash_ctl; MemoryContext new_ctx; - StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_INSERT == (int) REORDER_BUFFER_CHANGE_INSERT, "out of sync enums"); - StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_UPDATE == (int) REORDER_BUFFER_CHANGE_UPDATE, "out of sync enums"); - StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_DELETE == (int) REORDER_BUFFER_CHANGE_DELETE, "out of sync enums"); - /* allocate memory in own context, to have better accountability */ new_ctx = AllocSetContextCreate(CurrentMemoryContext, "ReorderBuffer", @@ -427,28 +403,28 @@ void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) { /* free contained data */ - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: - if (change->tp.newtuple) + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + if (change->data.tp.newtuple) { - ReorderBufferReturnTupleBuf(rb, change->tp.newtuple); - change->tp.newtuple = NULL; + ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple); + change->data.tp.newtuple = NULL; } - if (change->tp.oldtuple) + if (change->data.tp.oldtuple) { - ReorderBufferReturnTupleBuf(rb, change->tp.oldtuple); - change->tp.oldtuple = NULL; + ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple); + change->data.tp.oldtuple = NULL; } break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: - if (change->snapshot) + if (change->data.snapshot) { - ReorderBufferFreeSnap(rb, change->snapshot); - change->snapshot = NULL; + ReorderBufferFreeSnap(rb, change->data.snapshot); + change->data.snapshot = NULL; } break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -1086,7 +1062,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferChange *change; change = dlist_container(ReorderBufferChange, node, iter.cur); - Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); ReorderBufferReturnChange(rb, change); } @@ -1161,14 +1137,14 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) change = dlist_container(ReorderBufferChange, node, iter.cur); - Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); /* be careful about padding */ memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); - key.relnode = change->tuplecid.node; + key.relnode = change->data.tuplecid.node; - ItemPointerCopy(&change->tuplecid.tid, + ItemPointerCopy(&change->data.tuplecid.tid, &key.tid); ent = (ReorderBufferTupleCidEnt *) @@ -1178,22 +1154,22 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) &found); if (!found) { - ent->cmin = change->tuplecid.cmin; - ent->cmax = change->tuplecid.cmax; - ent->combocid = change->tuplecid.combocid; + ent->cmin = change->data.tuplecid.cmin; + ent->cmax = change->data.tuplecid.cmax; + ent->combocid = change->data.tuplecid.combocid; } else { - Assert(ent->cmin == change->tuplecid.cmin); + Assert(ent->cmin == change->data.tuplecid.cmin); Assert(ent->cmax == InvalidCommandId || - ent->cmax == change->tuplecid.cmax); + ent->cmax == change->data.tuplecid.cmax); /* * if the tuple got valid in this transaction and now got deleted * we already have a valid cmin stored. The cmax will be * InvalidCommandId though. */ - ent->cmax = change->tuplecid.cmax; + ent->cmax = change->data.tuplecid.cmax; } } } @@ -1367,33 +1343,33 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Relation relation = NULL; Oid reloid; - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: Assert(snapshot_now); - reloid = RelidByRelfilenode(change->tp.relnode.spcNode, - change->tp.relnode.relNode); + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); /* * Catalog tuple without data, emitted while catalog was * in the process of being rewritten. */ if (reloid == InvalidOid && - change->tp.newtuple == NULL && - change->tp.oldtuple == NULL) + change->data.tp.newtuple == NULL && + change->data.tp.oldtuple == NULL) continue; else if (reloid == InvalidOid) elog(ERROR, "could not lookup relation %s", - relpathperm(change->tp.relnode, MAIN_FORKNUM)); + relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); relation = RelationIdGetRelation(reloid); if (relation == NULL) elog(ERROR, "could open relation descriptor %s", - relpathperm(change->tp.relnode, MAIN_FORKNUM)); + relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); if (RelationIsLogicallyLogged(relation)) { @@ -1440,7 +1416,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, { ReorderBufferFreeSnap(rb, snapshot_now); snapshot_now = - ReorderBufferCopySnap(rb, change->snapshot, + ReorderBufferCopySnap(rb, change->data.snapshot, txn, command_id); } /* @@ -1448,15 +1424,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * free. We could introduce refcounting for that, but for * now this seems infrequent enough not to care. */ - else if (change->snapshot->copied) + else if (change->data.snapshot->copied) { snapshot_now = - ReorderBufferCopySnap(rb, change->snapshot, + ReorderBufferCopySnap(rb, change->data.snapshot, txn, command_id); } else { - snapshot_now = change->snapshot; + snapshot_now = change->data.snapshot; } @@ -1465,11 +1441,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: - Assert(change->command_id != InvalidCommandId); + Assert(change->data.command_id != InvalidCommandId); - if (command_id < change->command_id) + if (command_id < change->data.command_id) { - command_id = change->command_id; + command_id = change->data.command_id; if (!snapshot_now->copied) { @@ -1712,8 +1688,8 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change = ReorderBufferGetChange(rb); - change->snapshot = snap; - change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; + change->data.snapshot = snap; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; ReorderBufferQueueChange(rb, xid, lsn, change); } @@ -1752,8 +1728,8 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change = ReorderBufferGetChange(rb); - change->command_id = cid; - change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; + change->data.command_id = cid; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; ReorderBufferQueueChange(rb, xid, lsn, change); } @@ -1773,13 +1749,13 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - change->tuplecid.node = node; - change->tuplecid.tid = tid; - change->tuplecid.cmin = cmin; - change->tuplecid.cmax = cmax; - change->tuplecid.combocid = combocid; + change->data.tuplecid.node = node; + change->data.tuplecid.tid = tid; + change->data.tuplecid.cmin = cmin; + change->data.tuplecid.cmax = cmax; + change->data.tuplecid.combocid = combocid; change->lsn = lsn; - change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; dlist_push_tail(&txn->tuplecids, &change->node); txn->ntuplecids++; @@ -2017,26 +1993,30 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ondisk = (ReorderBufferDiskChange *) rb->outbuf; memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: + case REORDER_BUFFER_CHANGE_INSERT: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: + case REORDER_BUFFER_CHANGE_UPDATE: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: + case REORDER_BUFFER_CHANGE_DELETE: { char *data; + ReorderBufferTupleBuf *oldtup, *newtup; Size oldlen = 0; Size newlen = 0; - if (change->tp.oldtuple) + oldtup = change->data.tp.oldtuple; + newtup = change->data.tp.newtuple; + + if (oldtup) oldlen = offsetof(ReorderBufferTupleBuf, data) - + change->tp.oldtuple->tuple.t_len + + oldtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); - if (change->tp.newtuple) + if (newtup) newlen = offsetof(ReorderBufferTupleBuf, data) - + change->tp.newtuple->tuple.t_len + + newtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); sz += oldlen; @@ -2051,26 +2031,27 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldlen) { - memcpy(data, change->tp.oldtuple, oldlen); + memcpy(data, oldtup, oldlen); data += oldlen; - Assert(&change->tp.oldtuple->header == change->tp.oldtuple->tuple.t_data); } if (newlen) { - memcpy(data, change->tp.newtuple, newlen); + memcpy(data, newtup, newlen); data += newlen; - Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data); } break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { + Snapshot snap; char *data; + snap = change->data.snapshot; + sz += sizeof(SnapshotData) + - sizeof(TransactionId) * change->snapshot->xcnt + - sizeof(TransactionId) * change->snapshot->subxcnt + sizeof(TransactionId) * snap->xcnt + + sizeof(TransactionId) * snap->subxcnt ; /* make sure we have enough space */ @@ -2079,21 +2060,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* might have been reallocated above */ ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(data, change->snapshot, sizeof(SnapshotData)); + memcpy(data, snap, sizeof(SnapshotData)); data += sizeof(SnapshotData); - if (change->snapshot->xcnt) + if (snap->xcnt) { - memcpy(data, change->snapshot->xip, - sizeof(TransactionId) + change->snapshot->xcnt); - data += sizeof(TransactionId) + change->snapshot->xcnt; + memcpy(data, snap->xip, + sizeof(TransactionId) + snap->xcnt); + data += sizeof(TransactionId) + snap->xcnt; } - if (change->snapshot->subxcnt) + if (snap->subxcnt) { - memcpy(data, change->snapshot->subxip, - sizeof(TransactionId) + change->snapshot->subxcnt); - data += sizeof(TransactionId) + change->snapshot->subxcnt; + memcpy(data, snap->subxip, + sizeof(TransactionId) + snap->subxcnt); + data += sizeof(TransactionId) + snap->subxcnt; } break; } @@ -2116,7 +2097,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, txn->xid))); } - Assert(ondisk->change.action_internal == change->action_internal); + Assert(ondisk->change.action == change->action); } /* @@ -2271,56 +2252,60 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, data += sizeof(ReorderBufferDiskChange); /* restore individual stuff */ - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: + case REORDER_BUFFER_CHANGE_INSERT: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: + case REORDER_BUFFER_CHANGE_UPDATE: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: - if (change->tp.newtuple) + case REORDER_BUFFER_CHANGE_DELETE: + if (change->data.tp.newtuple) { Size len = offsetof(ReorderBufferTupleBuf, data) +((ReorderBufferTupleBuf *) data)->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); - change->tp.newtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->tp.newtuple, data, len); - change->tp.newtuple->tuple.t_data = &change->tp.newtuple->header; - + change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb); + memcpy(change->data.tp.newtuple, data, len); + change->data.tp.newtuple->tuple.t_data = + &change->data.tp.newtuple->header; data += len; } - if (change->tp.oldtuple) + if (change->data.tp.oldtuple) { Size len = offsetof(ReorderBufferTupleBuf, data) +((ReorderBufferTupleBuf *) data)->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); - change->tp.oldtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->tp.oldtuple, data, len); - change->tp.oldtuple->tuple.t_data = &change->tp.oldtuple->header; + change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb); + memcpy(change->data.tp.oldtuple, data, len); + change->data.tp.oldtuple->tuple.t_data = + &change->data.tp.oldtuple->header; data += len; } break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { - Snapshot oldsnap = (Snapshot) data; - Size size = sizeof(SnapshotData) + - sizeof(TransactionId) * oldsnap->xcnt + - sizeof(TransactionId) * (oldsnap->subxcnt + 0) - ; - - Assert(change->snapshot != NULL); - - change->snapshot = MemoryContextAllocZero(rb->context, size); - - memcpy(change->snapshot, data, size); - change->snapshot->xip = (TransactionId *) - (((char *) change->snapshot) + sizeof(SnapshotData)); - change->snapshot->subxip = - change->snapshot->xip + change->snapshot->xcnt + 0; - change->snapshot->copied = true; + Snapshot oldsnap; + Snapshot newsnap; + Size size; + + oldsnap = (Snapshot) data; + + size = sizeof(SnapshotData) + + sizeof(TransactionId) * oldsnap->xcnt + + sizeof(TransactionId) * (oldsnap->subxcnt + 0); + + change->data.snapshot = MemoryContextAllocZero(rb->context, size); + + newsnap = change->data.snapshot; + + memcpy(newsnap, data, size); + newsnap->xip = (TransactionId *) + (((char *) newsnap) + sizeof(SnapshotData)); + newsnap->subxip = newsnap->xip + newsnap->xcnt; + newsnap->copied = true; break; } /* the base struct contains all the data, easy peasy */ @@ -2464,6 +2449,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { ReorderBufferToastEnt *ent; + ReorderBufferTupleBuf *newtup; bool found; int32 chunksize; bool isnull; @@ -2477,9 +2463,10 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(IsToastRelation(relation)); - chunk_id = DatumGetObjectId(fastgetattr(&change->tp.newtuple->tuple, 1, desc, &isnull)); + newtup = change->data.tp.newtuple; + chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull)); Assert(!isnull); - chunk_seq = DatumGetInt32(fastgetattr(&change->tp.newtuple->tuple, 2, desc, &isnull)); + chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull)); Assert(!isnull); ent = (ReorderBufferToastEnt *) @@ -2505,7 +2492,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d", chunk_seq, chunk_id, ent->last_chunk_seq + 1); - chunk = DatumGetPointer(fastgetattr(&change->tp.newtuple->tuple, 3, desc, &isnull)); + chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull)); Assert(!isnull); /* calculate size so we can allocate the right size at once later */ @@ -2539,10 +2526,11 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Datum *attrs; bool *isnull; bool *free; - HeapTuple newtup; + HeapTuple tmphtup; Relation toast_rel; TupleDesc toast_desc; MemoryContext oldcontext; + ReorderBufferTupleBuf *newtup; /* no toast tuples changed */ if (txn->toast_hash == NULL) @@ -2551,7 +2539,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, oldcontext = MemoryContextSwitchTo(rb->context); /* we should only have toast tuples in an INSERT or UPDATE */ - Assert(change->tp.newtuple); + Assert(change->data.tp.newtuple); desc = RelationGetDescr(relation); @@ -2563,8 +2551,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, isnull = palloc0(sizeof(bool) * desc->natts); free = palloc0(sizeof(bool) * desc->natts); - heap_deform_tuple(&change->tp.newtuple->tuple, desc, - attrs, isnull); + newtup = change->data.tp.newtuple; + + heap_deform_tuple(&newtup->tuple, desc, attrs, isnull); for (natt = 0; natt < desc->natts; natt++) { @@ -2628,10 +2617,14 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, dlist_foreach(it, &ent->chunks) { bool isnull; - ReorderBufferTupleBuf *tup = - dlist_container(ReorderBufferChange, node, it.cur)->tp.newtuple; - Pointer chunk = - DatumGetPointer(fastgetattr(&tup->tuple, 3, toast_desc, &isnull)); + ReorderBufferChange *cchange; + ReorderBufferTupleBuf *ctup; + Pointer chunk; + + cchange = dlist_container(ReorderBufferChange, node, it.cur); + ctup = cchange->data.tp.newtuple; + chunk = DatumGetPointer( + fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); Assert(!isnull); Assert(!VARATT_IS_EXTERNAL(chunk)); @@ -2665,21 +2658,19 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, * passed to the output plugin. We can't directly heap_fill_tuple() into * the tuplebuf because attrs[] will point back into the current content. */ - newtup = heap_form_tuple(desc, attrs, isnull); - Assert(change->tp.newtuple->tuple.t_len <= MaxHeapTupleSize); - Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data); + tmphtup = heap_form_tuple(desc, attrs, isnull); + Assert(newtup->tuple.t_len <= MaxHeapTupleSize); + Assert(&newtup->header == newtup->tuple.t_data); - memcpy(change->tp.newtuple->tuple.t_data, - newtup->t_data, - newtup->t_len); - change->tp.newtuple->tuple.t_len = newtup->t_len; + memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); + newtup->tuple.t_len = tmphtup->t_len; /* * free resources we won't further need, more persistent stuff will be * free'd in ReorderBufferToastReset(). */ RelationClose(toast_rel); - pfree(newtup); + pfree(tmphtup); for (natt = 0; natt < desc->natts; natt++) { if (free[natt]) diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 01eabfb7be7..04ff002990d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -32,12 +32,22 @@ typedef struct ReorderBufferTupleBuf char data[MaxHeapTupleSize]; } ReorderBufferTupleBuf; -/* types of the change passed to a 'change' callback */ +/* + * Types of the change passed to a 'change' callback. + * + * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds + * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE + * changes. Users of the decoding facilities will never see changes with + * *_INTERNAL_* actions. + */ enum ReorderBufferChangeType { REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, - REORDER_BUFFER_CHANGE_DELETE + REORDER_BUFFER_CHANGE_DELETE, + REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, + REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, + REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID }; /* @@ -51,13 +61,8 @@ typedef struct ReorderBufferChange { XLogRecPtr lsn; - /* type of change */ - union - { - enum ReorderBufferChangeType action; - /* do not leak internal enum values to the outside */ - int action_internal; - }; + /* The type of change. */ + enum ReorderBufferChangeType action; /* * Context data for the change, which part of the union is valid depends @@ -65,7 +70,7 @@ typedef struct ReorderBufferChange */ union { - /* old, new tuples when action == *_INSERT|UPDATE|DELETE */ + /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */ struct { /* relation that has been changed */ @@ -76,13 +81,19 @@ typedef struct ReorderBufferChange ReorderBufferTupleBuf *newtuple; } tp; - /* new snapshot */ + /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */ Snapshot snapshot; - /* new command id for existing snapshot in a catalog changing tx */ + /* + * New command id for existing snapshot in a catalog changing tx. Set + * when action == *_INTERNAL_COMMAND_ID. + */ CommandId command_id; - /* new cid mapping for catalog changing transaction */ + /* + * New cid mapping for catalog changing transaction, set when action + * == *_INTERNAL_TUPLECID. + */ struct { RelFileNode node; @@ -91,7 +102,7 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; - }; + } data; /* * While in use this is how a change is linked into a transactions, |