diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 82 |
1 files changed, 48 insertions, 34 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 9610b6ce773..0f7ff1501be 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -625,13 +625,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { + Size tuplelen = r->xl_len - SizeOfHeapInsert; + Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader)); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert, - r->xl_len - SizeOfHeapInsert, - change->data.tp.newtuple); + tuplelen, change->data.tp.newtuple); } change->data.tp.clear_toast_afterwards = true; @@ -650,7 +652,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { XLogRecord *r = &buf->record; xl_heap_update *xlrec; - xl_heap_header_len xlhdr; ReorderBufferChange *change; char *data; @@ -669,16 +670,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { + Size tuplelen; + xl_heap_header_len xlhdr; + Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen)); memcpy(&xlhdr, data, sizeof(xlhdr)); data += offsetof(xl_heap_header_len, header); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + tuplelen = xlhdr.t_len + SizeOfHeapHeader; - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.newtuple); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeXLogTuple(data, tuplelen, change->data.tp.newtuple); /* skip over the rest of the tuple header */ data += SizeOfHeapHeader; /* skip over the tuple data */ @@ -687,14 +692,18 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { + Size tuplelen; + xl_heap_header_len xlhdr; + memcpy(&xlhdr, data, sizeof(xlhdr)); data += offsetof(xl_heap_header_len, header); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + tuplelen = xlhdr.t_len + SizeOfHeapHeader; - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.oldtuple); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeXLogTuple(data, tuplelen, change->data.tp.oldtuple); #ifdef NOT_USED data += SizeOfHeapHeader; data += xlhdr.t_len; @@ -732,13 +741,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { + Size len = r->xl_len - SizeOfHeapDelete; + Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, len); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - r->xl_len - SizeOfHeapDelete, - change->data.tp.oldtuple); + len, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; @@ -795,37 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + HeapTupleHeader header; + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); + data = ((char *) xlhdr) + SizeOfMultiInsertTuple; + datalen = xlhdr->datalen; + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); tuple = change->data.tp.newtuple; + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); - xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); - data = ((char *) xlhdr) + SizeOfMultiInsertTuple; - datalen = xlhdr->datalen; - /* * We can only figure this out after reassembling the * transactions. */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->header; + tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits); - memset(&tuple->header, 0, sizeof(HeapTupleHeaderData)); + memset(header, 0, offsetof(HeapTupleHeaderData, t_bits)); - memcpy((char *) &tuple->header - + offsetof(HeapTupleHeaderData, t_bits), + memcpy((char *) tuple->tuple.t_data + offsetof(HeapTupleHeaderData, t_bits), (char *) data, datalen); data += datalen; - tuple->header.t_infomask = xlhdr->t_infomask; - tuple->header.t_infomask2 = xlhdr->t_infomask2; - tuple->header.t_hoff = xlhdr->t_hoff; + header->t_infomask = xlhdr->t_infomask; + header->t_infomask2 = xlhdr->t_infomask2; + header->t_hoff = xlhdr->t_hoff; } /* @@ -856,31 +870,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) { xl_heap_header xlhdr; int datalen = len - SizeOfHeapHeader; + HeapTupleHeader header; Assert(datalen >= 0); - Assert(datalen <= MaxHeapTupleSize); tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits); + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); /* we can only figure this out after reassembling the transactions */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->header; /* data is not stored aligned, copy to aligned storage */ memcpy((char *) &xlhdr, data, SizeOfHeapHeader); - memset(&tuple->header, 0, sizeof(HeapTupleHeaderData)); + memset(header, 0, offsetof(HeapTupleHeaderData, t_bits)); - memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits), + memcpy(((char *) tuple->tuple.t_data) + offsetof(HeapTupleHeaderData, t_bits), data + SizeOfHeapHeader, datalen); - tuple->header.t_infomask = xlhdr.t_infomask; - tuple->header.t_infomask2 = xlhdr.t_infomask2; - tuple->header.t_hoff = xlhdr.t_hoff; + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; } |