aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/test_decoding.c18
-rw-r--r--src/backend/replication/logical/decode.c28
-rw-r--r--src/backend/replication/logical/reorderbuffer.c283
-rw-r--r--src/include/replication/reorderbuffer.h39
4 files changed, 186 insertions, 182 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ea463fb2e7c..e356c7ca675 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -358,43 +358,45 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
appendStringInfoString(ctx->out, " INSERT:");
- if (change->tp.newtuple == NULL)
+ if (change->data.tp.newtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
else
tuple_to_stringinfo(ctx->out, tupdesc,
- &change->tp.newtuple->tuple,
+ &change->data.tp.newtuple->tuple,
false);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
appendStringInfoString(ctx->out, " UPDATE:");
- if (change->tp.oldtuple != NULL)
+ if (change->data.tp.oldtuple != NULL)
{
appendStringInfoString(ctx->out, " old-key:");
tuple_to_stringinfo(ctx->out, tupdesc,
- &change->tp.oldtuple->tuple,
+ &change->data.tp.oldtuple->tuple,
true);
appendStringInfoString(ctx->out, " new-tuple:");
}
- if (change->tp.newtuple == NULL)
+ if (change->data.tp.newtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
else
tuple_to_stringinfo(ctx->out, tupdesc,
- &change->tp.newtuple->tuple,
+ &change->data.tp.newtuple->tuple,
false);
break;
case REORDER_BUFFER_CHANGE_DELETE:
appendStringInfoString(ctx->out, " DELETE:");
/* if there was no PK, we only know that a delete happened */
- if (change->tp.oldtuple == NULL)
+ if (change->data.tp.oldtuple == NULL)
appendStringInfoString(ctx->out, " (no-tuple-data)");
/* In DELETE, only the replica identity is present; display that */
else
tuple_to_stringinfo(ctx->out, tupdesc,
- &change->tp.oldtuple->tuple,
+ &change->data.tp.oldtuple->tuple,
true);
break;
+ default:
+ Assert(false);
}
MemoryContextSwitchTo(old);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index af3948e8a4a..414cfa95586 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -586,17 +586,17 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
- memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
- change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
r->xl_len - SizeOfHeapInsert,
- change->tp.newtuple);
+ change->data.tp.newtuple);
}
ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
@@ -626,7 +626,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
- memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
data = (char *) &xlhdr->header;
@@ -634,11 +634,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
- change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
DecodeXLogTuple(data,
xlhdr->t_len + SizeOfHeapHeader,
- change->tp.newtuple);
+ change->data.tp.newtuple);
/* skip over the rest of the tuple header */
data += SizeOfHeapHeader;
/* skip over the tuple data */
@@ -648,10 +648,10 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
xlhdr = (xl_heap_header_len *) data;
- change->tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
DecodeXLogTuple((char *) &xlhdr->header,
xlhdr->t_len + SizeOfHeapHeader,
- change->tp.oldtuple);
+ change->data.tp.oldtuple);
data = (char *) &xlhdr->header;
data += SizeOfHeapHeader;
data += xlhdr->t_len;
@@ -681,18 +681,18 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
- memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
/* old primary key stored */
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
- change->tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
r->xl_len - SizeOfHeapDelete,
- change->tp.oldtuple);
+ change->data.tp.oldtuple);
}
ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
}
@@ -735,7 +735,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
- memcpy(&change->tp.relnode, &xlrec->node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
/*
* CONTAINS_NEW_TUPLE will always be set currently as multi_insert
@@ -746,9 +746,9 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
- change->tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
- tuple = change->tp.newtuple;
+ tuple = change->data.tp.newtuple;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ad560345b40..4493930eda0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -80,26 +80,6 @@
#include "utils/relfilenodemap.h"
#include "utils/tqual.h"
-/*
- * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
- * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
- * changes. We don't want to leak those internal values to external users
- * though (they would just use switch()...default:) because that would make it
- * harder to add to new user visible values.
- *
- * This needs to be synchronized with ReorderBufferChangeType! Adjust the
- * StaticAssertExpr's in ReorderBufferAllocate if you add anything!
- */
-typedef enum
-{
- REORDER_BUFFER_CHANGE_INTERNAL_INSERT,
- REORDER_BUFFER_CHANGE_INTERNAL_UPDATE,
- REORDER_BUFFER_CHANGE_INTERNAL_DELETE,
- REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
- REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
- REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
-} ReorderBufferChangeTypeInternal;
-
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
@@ -255,10 +235,6 @@ ReorderBufferAllocate(void)
HASHCTL hash_ctl;
MemoryContext new_ctx;
- StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_INSERT == (int) REORDER_BUFFER_CHANGE_INSERT, "out of sync enums");
- StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_UPDATE == (int) REORDER_BUFFER_CHANGE_UPDATE, "out of sync enums");
- StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_DELETE == (int) REORDER_BUFFER_CHANGE_DELETE, "out of sync enums");
-
/* allocate memory in own context, to have better accountability */
new_ctx = AllocSetContextCreate(CurrentMemoryContext,
"ReorderBuffer",
@@ -427,28 +403,28 @@ void
ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
{
/* free contained data */
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
- if (change->tp.newtuple)
+ case REORDER_BUFFER_CHANGE_INSERT:
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (change->data.tp.newtuple)
{
- ReorderBufferReturnTupleBuf(rb, change->tp.newtuple);
- change->tp.newtuple = NULL;
+ ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
+ change->data.tp.newtuple = NULL;
}
- if (change->tp.oldtuple)
+ if (change->data.tp.oldtuple)
{
- ReorderBufferReturnTupleBuf(rb, change->tp.oldtuple);
- change->tp.oldtuple = NULL;
+ ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
+ change->data.tp.oldtuple = NULL;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
- if (change->snapshot)
+ if (change->data.snapshot)
{
- ReorderBufferFreeSnap(rb, change->snapshot);
- change->snapshot = NULL;
+ ReorderBufferFreeSnap(rb, change->data.snapshot);
+ change->data.snapshot = NULL;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -1086,7 +1062,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferChange *change;
change = dlist_container(ReorderBufferChange, node, iter.cur);
- Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
ReorderBufferReturnChange(rb, change);
}
@@ -1161,14 +1137,14 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
change = dlist_container(ReorderBufferChange, node, iter.cur);
- Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
/* be careful about padding */
memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
- key.relnode = change->tuplecid.node;
+ key.relnode = change->data.tuplecid.node;
- ItemPointerCopy(&change->tuplecid.tid,
+ ItemPointerCopy(&change->data.tuplecid.tid,
&key.tid);
ent = (ReorderBufferTupleCidEnt *)
@@ -1178,22 +1154,22 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
&found);
if (!found)
{
- ent->cmin = change->tuplecid.cmin;
- ent->cmax = change->tuplecid.cmax;
- ent->combocid = change->tuplecid.combocid;
+ ent->cmin = change->data.tuplecid.cmin;
+ ent->cmax = change->data.tuplecid.cmax;
+ ent->combocid = change->data.tuplecid.combocid;
}
else
{
- Assert(ent->cmin == change->tuplecid.cmin);
+ Assert(ent->cmin == change->data.tuplecid.cmin);
Assert(ent->cmax == InvalidCommandId ||
- ent->cmax == change->tuplecid.cmax);
+ ent->cmax == change->data.tuplecid.cmax);
/*
* if the tuple got valid in this transaction and now got deleted
* we already have a valid cmin stored. The cmax will be
* InvalidCommandId though.
*/
- ent->cmax = change->tuplecid.cmax;
+ ent->cmax = change->data.tuplecid.cmax;
}
}
}
@@ -1367,33 +1343,33 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
Relation relation = NULL;
Oid reloid;
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
+ case REORDER_BUFFER_CHANGE_INSERT:
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ case REORDER_BUFFER_CHANGE_DELETE:
Assert(snapshot_now);
- reloid = RelidByRelfilenode(change->tp.relnode.spcNode,
- change->tp.relnode.relNode);
+ reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
+ change->data.tp.relnode.relNode);
/*
* Catalog tuple without data, emitted while catalog was
* in the process of being rewritten.
*/
if (reloid == InvalidOid &&
- change->tp.newtuple == NULL &&
- change->tp.oldtuple == NULL)
+ change->data.tp.newtuple == NULL &&
+ change->data.tp.oldtuple == NULL)
continue;
else if (reloid == InvalidOid)
elog(ERROR, "could not lookup relation %s",
- relpathperm(change->tp.relnode, MAIN_FORKNUM));
+ relpathperm(change->data.tp.relnode, MAIN_FORKNUM));
relation = RelationIdGetRelation(reloid);
if (relation == NULL)
elog(ERROR, "could open relation descriptor %s",
- relpathperm(change->tp.relnode, MAIN_FORKNUM));
+ relpathperm(change->data.tp.relnode, MAIN_FORKNUM));
if (RelationIsLogicallyLogged(relation))
{
@@ -1440,7 +1416,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferFreeSnap(rb, snapshot_now);
snapshot_now =
- ReorderBufferCopySnap(rb, change->snapshot,
+ ReorderBufferCopySnap(rb, change->data.snapshot,
txn, command_id);
}
/*
@@ -1448,15 +1424,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
* free. We could introduce refcounting for that, but for
* now this seems infrequent enough not to care.
*/
- else if (change->snapshot->copied)
+ else if (change->data.snapshot->copied)
{
snapshot_now =
- ReorderBufferCopySnap(rb, change->snapshot,
+ ReorderBufferCopySnap(rb, change->data.snapshot,
txn, command_id);
}
else
{
- snapshot_now = change->snapshot;
+ snapshot_now = change->data.snapshot;
}
@@ -1465,11 +1441,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
break;
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
- Assert(change->command_id != InvalidCommandId);
+ Assert(change->data.command_id != InvalidCommandId);
- if (command_id < change->command_id)
+ if (command_id < change->data.command_id)
{
- command_id = change->command_id;
+ command_id = change->data.command_id;
if (!snapshot_now->copied)
{
@@ -1712,8 +1688,8 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferChange *change = ReorderBufferGetChange(rb);
- change->snapshot = snap;
- change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
+ change->data.snapshot = snap;
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
ReorderBufferQueueChange(rb, xid, lsn, change);
}
@@ -1752,8 +1728,8 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
{
ReorderBufferChange *change = ReorderBufferGetChange(rb);
- change->command_id = cid;
- change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
+ change->data.command_id = cid;
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
ReorderBufferQueueChange(rb, xid, lsn, change);
}
@@ -1773,13 +1749,13 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- change->tuplecid.node = node;
- change->tuplecid.tid = tid;
- change->tuplecid.cmin = cmin;
- change->tuplecid.cmax = cmax;
- change->tuplecid.combocid = combocid;
+ change->data.tuplecid.node = node;
+ change->data.tuplecid.tid = tid;
+ change->data.tuplecid.cmin = cmin;
+ change->data.tuplecid.cmax = cmax;
+ change->data.tuplecid.combocid = combocid;
change->lsn = lsn;
- change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
dlist_push_tail(&txn->tuplecids, &change->node);
txn->ntuplecids++;
@@ -2017,26 +1993,30 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+ case REORDER_BUFFER_CHANGE_INSERT:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
+ case REORDER_BUFFER_CHANGE_UPDATE:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
+ case REORDER_BUFFER_CHANGE_DELETE:
{
char *data;
+ ReorderBufferTupleBuf *oldtup, *newtup;
Size oldlen = 0;
Size newlen = 0;
- if (change->tp.oldtuple)
+ oldtup = change->data.tp.oldtuple;
+ newtup = change->data.tp.newtuple;
+
+ if (oldtup)
oldlen = offsetof(ReorderBufferTupleBuf, data)
- + change->tp.oldtuple->tuple.t_len
+ + oldtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
- if (change->tp.newtuple)
+ if (newtup)
newlen = offsetof(ReorderBufferTupleBuf, data)
- + change->tp.newtuple->tuple.t_len
+ + newtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
sz += oldlen;
@@ -2051,26 +2031,27 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldlen)
{
- memcpy(data, change->tp.oldtuple, oldlen);
+ memcpy(data, oldtup, oldlen);
data += oldlen;
- Assert(&change->tp.oldtuple->header == change->tp.oldtuple->tuple.t_data);
}
if (newlen)
{
- memcpy(data, change->tp.newtuple, newlen);
+ memcpy(data, newtup, newlen);
data += newlen;
- Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data);
}
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
+ Snapshot snap;
char *data;
+ snap = change->data.snapshot;
+
sz += sizeof(SnapshotData) +
- sizeof(TransactionId) * change->snapshot->xcnt +
- sizeof(TransactionId) * change->snapshot->subxcnt
+ sizeof(TransactionId) * snap->xcnt +
+ sizeof(TransactionId) * snap->subxcnt
;
/* make sure we have enough space */
@@ -2079,21 +2060,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* might have been reallocated above */
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
- memcpy(data, change->snapshot, sizeof(SnapshotData));
+ memcpy(data, snap, sizeof(SnapshotData));
data += sizeof(SnapshotData);
- if (change->snapshot->xcnt)
+ if (snap->xcnt)
{
- memcpy(data, change->snapshot->xip,
- sizeof(TransactionId) + change->snapshot->xcnt);
- data += sizeof(TransactionId) + change->snapshot->xcnt;
+ memcpy(data, snap->xip,
+ sizeof(TransactionId) + snap->xcnt);
+ data += sizeof(TransactionId) + snap->xcnt;
}
- if (change->snapshot->subxcnt)
+ if (snap->subxcnt)
{
- memcpy(data, change->snapshot->subxip,
- sizeof(TransactionId) + change->snapshot->subxcnt);
- data += sizeof(TransactionId) + change->snapshot->subxcnt;
+ memcpy(data, snap->subxip,
+ sizeof(TransactionId) + snap->subxcnt);
+ data += sizeof(TransactionId) + snap->subxcnt;
}
break;
}
@@ -2116,7 +2097,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
txn->xid)));
}
- Assert(ondisk->change.action_internal == change->action_internal);
+ Assert(ondisk->change.action == change->action);
}
/*
@@ -2271,56 +2252,60 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
data += sizeof(ReorderBufferDiskChange);
/* restore individual stuff */
- switch ((ReorderBufferChangeTypeInternal) change->action_internal)
+ switch (change->action)
{
- case REORDER_BUFFER_CHANGE_INTERNAL_INSERT:
+ case REORDER_BUFFER_CHANGE_INSERT:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE:
+ case REORDER_BUFFER_CHANGE_UPDATE:
/* fall through */
- case REORDER_BUFFER_CHANGE_INTERNAL_DELETE:
- if (change->tp.newtuple)
+ case REORDER_BUFFER_CHANGE_DELETE:
+ if (change->data.tp.newtuple)
{
Size len = offsetof(ReorderBufferTupleBuf, data)
+((ReorderBufferTupleBuf *) data)->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
- change->tp.newtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->tp.newtuple, data, len);
- change->tp.newtuple->tuple.t_data = &change->tp.newtuple->header;
-
+ change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
+ memcpy(change->data.tp.newtuple, data, len);
+ change->data.tp.newtuple->tuple.t_data =
+ &change->data.tp.newtuple->header;
data += len;
}
- if (change->tp.oldtuple)
+ if (change->data.tp.oldtuple)
{
Size len = offsetof(ReorderBufferTupleBuf, data)
+((ReorderBufferTupleBuf *) data)->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
- change->tp.oldtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->tp.oldtuple, data, len);
- change->tp.oldtuple->tuple.t_data = &change->tp.oldtuple->header;
+ change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
+ memcpy(change->data.tp.oldtuple, data, len);
+ change->data.tp.oldtuple->tuple.t_data =
+ &change->data.tp.oldtuple->header;
data += len;
}
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
- Snapshot oldsnap = (Snapshot) data;
- Size size = sizeof(SnapshotData) +
- sizeof(TransactionId) * oldsnap->xcnt +
- sizeof(TransactionId) * (oldsnap->subxcnt + 0)
- ;
-
- Assert(change->snapshot != NULL);
-
- change->snapshot = MemoryContextAllocZero(rb->context, size);
-
- memcpy(change->snapshot, data, size);
- change->snapshot->xip = (TransactionId *)
- (((char *) change->snapshot) + sizeof(SnapshotData));
- change->snapshot->subxip =
- change->snapshot->xip + change->snapshot->xcnt + 0;
- change->snapshot->copied = true;
+ Snapshot oldsnap;
+ Snapshot newsnap;
+ Size size;
+
+ oldsnap = (Snapshot) data;
+
+ size = sizeof(SnapshotData) +
+ sizeof(TransactionId) * oldsnap->xcnt +
+ sizeof(TransactionId) * (oldsnap->subxcnt + 0);
+
+ change->data.snapshot = MemoryContextAllocZero(rb->context, size);
+
+ newsnap = change->data.snapshot;
+
+ memcpy(newsnap, data, size);
+ newsnap->xip = (TransactionId *)
+ (((char *) newsnap) + sizeof(SnapshotData));
+ newsnap->subxip = newsnap->xip + newsnap->xcnt;
+ newsnap->copied = true;
break;
}
/* the base struct contains all the data, easy peasy */
@@ -2464,6 +2449,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
ReorderBufferToastEnt *ent;
+ ReorderBufferTupleBuf *newtup;
bool found;
int32 chunksize;
bool isnull;
@@ -2477,9 +2463,10 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
Assert(IsToastRelation(relation));
- chunk_id = DatumGetObjectId(fastgetattr(&change->tp.newtuple->tuple, 1, desc, &isnull));
+ newtup = change->data.tp.newtuple;
+ chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
Assert(!isnull);
- chunk_seq = DatumGetInt32(fastgetattr(&change->tp.newtuple->tuple, 2, desc, &isnull));
+ chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
Assert(!isnull);
ent = (ReorderBufferToastEnt *)
@@ -2505,7 +2492,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
chunk_seq, chunk_id, ent->last_chunk_seq + 1);
- chunk = DatumGetPointer(fastgetattr(&change->tp.newtuple->tuple, 3, desc, &isnull));
+ chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
Assert(!isnull);
/* calculate size so we can allocate the right size at once later */
@@ -2539,10 +2526,11 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
Datum *attrs;
bool *isnull;
bool *free;
- HeapTuple newtup;
+ HeapTuple tmphtup;
Relation toast_rel;
TupleDesc toast_desc;
MemoryContext oldcontext;
+ ReorderBufferTupleBuf *newtup;
/* no toast tuples changed */
if (txn->toast_hash == NULL)
@@ -2551,7 +2539,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
oldcontext = MemoryContextSwitchTo(rb->context);
/* we should only have toast tuples in an INSERT or UPDATE */
- Assert(change->tp.newtuple);
+ Assert(change->data.tp.newtuple);
desc = RelationGetDescr(relation);
@@ -2563,8 +2551,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
isnull = palloc0(sizeof(bool) * desc->natts);
free = palloc0(sizeof(bool) * desc->natts);
- heap_deform_tuple(&change->tp.newtuple->tuple, desc,
- attrs, isnull);
+ newtup = change->data.tp.newtuple;
+
+ heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
for (natt = 0; natt < desc->natts; natt++)
{
@@ -2628,10 +2617,14 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
dlist_foreach(it, &ent->chunks)
{
bool isnull;
- ReorderBufferTupleBuf *tup =
- dlist_container(ReorderBufferChange, node, it.cur)->tp.newtuple;
- Pointer chunk =
- DatumGetPointer(fastgetattr(&tup->tuple, 3, toast_desc, &isnull));
+ ReorderBufferChange *cchange;
+ ReorderBufferTupleBuf *ctup;
+ Pointer chunk;
+
+ cchange = dlist_container(ReorderBufferChange, node, it.cur);
+ ctup = cchange->data.tp.newtuple;
+ chunk = DatumGetPointer(
+ fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
Assert(!isnull);
Assert(!VARATT_IS_EXTERNAL(chunk));
@@ -2665,21 +2658,19 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
* passed to the output plugin. We can't directly heap_fill_tuple() into
* the tuplebuf because attrs[] will point back into the current content.
*/
- newtup = heap_form_tuple(desc, attrs, isnull);
- Assert(change->tp.newtuple->tuple.t_len <= MaxHeapTupleSize);
- Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data);
+ tmphtup = heap_form_tuple(desc, attrs, isnull);
+ Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
+ Assert(&newtup->header == newtup->tuple.t_data);
- memcpy(change->tp.newtuple->tuple.t_data,
- newtup->t_data,
- newtup->t_len);
- change->tp.newtuple->tuple.t_len = newtup->t_len;
+ memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
+ newtup->tuple.t_len = tmphtup->t_len;
/*
* free resources we won't further need, more persistent stuff will be
* free'd in ReorderBufferToastReset().
*/
RelationClose(toast_rel);
- pfree(newtup);
+ pfree(tmphtup);
for (natt = 0; natt < desc->natts; natt++)
{
if (free[natt])
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 01eabfb7be7..04ff002990d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -32,12 +32,22 @@ typedef struct ReorderBufferTupleBuf
char data[MaxHeapTupleSize];
} ReorderBufferTupleBuf;
-/* types of the change passed to a 'change' callback */
+/*
+ * Types of the change passed to a 'change' callback.
+ *
+ * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
+ * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
+ * changes. Users of the decoding facilities will never see changes with
+ * *_INTERNAL_* actions.
+ */
enum ReorderBufferChangeType
{
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
- REORDER_BUFFER_CHANGE_DELETE
+ REORDER_BUFFER_CHANGE_DELETE,
+ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
+ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
+ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
};
/*
@@ -51,13 +61,8 @@ typedef struct ReorderBufferChange
{
XLogRecPtr lsn;
- /* type of change */
- union
- {
- enum ReorderBufferChangeType action;
- /* do not leak internal enum values to the outside */
- int action_internal;
- };
+ /* The type of change. */
+ enum ReorderBufferChangeType action;
/*
* Context data for the change, which part of the union is valid depends
@@ -65,7 +70,7 @@ typedef struct ReorderBufferChange
*/
union
{
- /* old, new tuples when action == *_INSERT|UPDATE|DELETE */
+ /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
struct
{
/* relation that has been changed */
@@ -76,13 +81,19 @@ typedef struct ReorderBufferChange
ReorderBufferTupleBuf *newtuple;
} tp;
- /* new snapshot */
+ /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
Snapshot snapshot;
- /* new command id for existing snapshot in a catalog changing tx */
+ /*
+ * New command id for existing snapshot in a catalog changing tx. Set
+ * when action == *_INTERNAL_COMMAND_ID.
+ */
CommandId command_id;
- /* new cid mapping for catalog changing transaction */
+ /*
+ * New cid mapping for catalog changing transaction, set when action
+ * == *_INTERNAL_TUPLECID.
+ */
struct
{
RelFileNode node;
@@ -91,7 +102,7 @@ typedef struct ReorderBufferChange
CommandId cmax;
CommandId combocid;
} tuplecid;
- };
+ } data;
/*
* While in use this is how a change is linked into a transactions,