aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c82
1 files changed, 48 insertions, 34 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9610b6ce773..0f7ff1501be 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -625,13 +625,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
+ Size tuplelen = r->xl_len - SizeOfHeapInsert;
+
Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
- r->xl_len - SizeOfHeapInsert,
- change->data.tp.newtuple);
+ tuplelen, change->data.tp.newtuple);
}
change->data.tp.clear_toast_afterwards = true;
@@ -650,7 +652,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogRecord *r = &buf->record;
xl_heap_update *xlrec;
- xl_heap_header_len xlhdr;
ReorderBufferChange *change;
char *data;
@@ -669,16 +670,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
+ Size tuplelen;
+ xl_heap_header_len xlhdr;
+
Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
memcpy(&xlhdr, data, sizeof(xlhdr));
data += offsetof(xl_heap_header_len, header);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ tuplelen = xlhdr.t_len + SizeOfHeapHeader;
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.newtuple);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(data, tuplelen, change->data.tp.newtuple);
/* skip over the rest of the tuple header */
data += SizeOfHeapHeader;
/* skip over the tuple data */
@@ -687,14 +692,18 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
+ Size tuplelen;
+ xl_heap_header_len xlhdr;
+
memcpy(&xlhdr, data, sizeof(xlhdr));
data += offsetof(xl_heap_header_len, header);
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ tuplelen = xlhdr.t_len + SizeOfHeapHeader;
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.oldtuple);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(data, tuplelen, change->data.tp.oldtuple);
#ifdef NOT_USED
data += SizeOfHeapHeader;
data += xlhdr.t_len;
@@ -732,13 +741,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* old primary key stored */
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
+ Size len = r->xl_len - SizeOfHeapDelete;
+
Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, len);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- r->xl_len - SizeOfHeapDelete,
- change->data.tp.oldtuple);
+ len, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
@@ -795,37 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ HeapTupleHeader header;
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+ data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ datalen = xlhdr->datalen;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
- data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
- datalen = xlhdr->datalen;
-
/*
* We can only figure this out after reassembling the
* transactions.
*/
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->header;
+
tuple->tuple.t_len = datalen
+ offsetof(HeapTupleHeaderData, t_bits);
- memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+ memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
- memcpy((char *) &tuple->header
- + offsetof(HeapTupleHeaderData, t_bits),
+ memcpy((char *) tuple->tuple.t_data + offsetof(HeapTupleHeaderData, t_bits),
(char *) data,
datalen);
data += datalen;
- tuple->header.t_infomask = xlhdr->t_infomask;
- tuple->header.t_infomask2 = xlhdr->t_infomask2;
- tuple->header.t_hoff = xlhdr->t_hoff;
+ header->t_infomask = xlhdr->t_infomask;
+ header->t_infomask2 = xlhdr->t_infomask2;
+ header->t_hoff = xlhdr->t_hoff;
}
/*
@@ -856,31 +870,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
+ HeapTupleHeader header;
Assert(datalen >= 0);
- Assert(datalen <= MaxHeapTupleSize);
tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->header;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
data,
SizeOfHeapHeader);
- memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+ memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
- memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+ memcpy(((char *) tuple->tuple.t_data) + offsetof(HeapTupleHeaderData, t_bits),
data + SizeOfHeapHeader,
datalen);
- tuple->header.t_infomask = xlhdr.t_infomask;
- tuple->header.t_infomask2 = xlhdr.t_infomask2;
- tuple->header.t_hoff = xlhdr.t_hoff;
+ header->t_infomask = xlhdr.t_infomask;
+ header->t_infomask2 = xlhdr.t_infomask2;
+ header->t_hoff = xlhdr.t_hoff;
}