aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2016-03-05 18:02:20 -0800
committerAndres Freund <andres@anarazel.de>2016-03-05 18:02:20 -0800
commit5990a034adfe4b777b07be10bd71730f1868e48d (patch)
tree2d52b3789b550d6679a81a9e2d811eeed1dc553a /src/backend/replication/logical/decode.c
parente76e365be9e7fc30f19d21962461a0046d15ea5d (diff)
downloadpostgresql-5990a034adfe4b777b07be10bd71730f1868e48d.tar.gz
postgresql-5990a034adfe4b777b07be10bd71730f1868e48d.zip
logical decoding: Fix handling of large old tuples with replica identity full.
When decoding the old version of an UPDATE or DELETE change, and if that tuple was bigger than MaxHeapTupleSize, we either Assert'ed out, or failed in more subtle ways in non-assert builds. Normally individual tuples aren't bigger than MaxHeapTupleSize, with big datums toasted. But that's not the case for the old version of a tuple for logical decoding; the replica identity is logged as one piece. With the default replica identity btree limits that to small tuples, but that's not the case for FULL. Change the tuple buffer infrastructure to separate allocate over-large tuples, instead of always going through the slab cache. This unfortunately requires changing the ReorderBufferTupleBuf definition, we need to store the allocated size someplace. To avoid requiring output plugins to recompile, don't store HeapTupleHeaderData directly after HeapTupleData, but point to it via t_data; that leaves rooms for the allocated size. As there's no reason for an output plugin to look at ReorderBufferTupleBuf->t_data.header, remove the field. It was just a minor convenience having it directly accessible. Reported-By: Adam DratwiƄski Discussion: CAKg6ypLd7773AOX4DiOGRwQk1TVOQKhNwjYiVjJnpq8Wo+i62Q@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c57
1 files changed, 33 insertions, 24 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c530fe9ba09..3f8c5e50fba 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -610,7 +610,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Size tuplelen;
char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple);
}
@@ -656,7 +657,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
data = XLogRecGetBlockData(r, 0, &datalen);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
@@ -667,7 +669,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
data = XLogRecGetData(r) + SizeOfHeapUpdate;
datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
}
@@ -717,13 +720,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* old primary key stored */
if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
{
+ Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete;
+
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, len);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- XLogRecGetDataLen(r) - SizeOfHeapDelete,
- change->data.tp.oldtuple);
+ len, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
@@ -783,35 +788,39 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ HeapTupleHeader header;
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+ data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ datalen = xlhdr->datalen;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
- data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
- datalen = xlhdr->datalen;
-
/*
* We can only figure this out after reassembling the
* transactions.
*/
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
+
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
(char *) data,
datalen);
data += datalen;
- tuple->t_data.header.t_infomask = xlhdr->t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr->t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr->t_hoff;
+ header->t_infomask = xlhdr->t_infomask;
+ header->t_infomask2 = xlhdr->t_infomask2;
+ header->t_hoff = xlhdr->t_hoff;
}
/*
@@ -877,31 +886,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
+ HeapTupleHeader header;
Assert(datalen >= 0);
- Assert(datalen <= MaxHeapTupleSize);
tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->t_data.header;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
data,
SizeOfHeapHeader);
- memset(&tuple->t_data.header, 0, SizeofHeapTupleHeader);
+ memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) &tuple->t_data.header + SizeofHeapTupleHeader,
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
data + SizeOfHeapHeader,
datalen);
- tuple->t_data.header.t_infomask = xlhdr.t_infomask;
- tuple->t_data.header.t_infomask2 = xlhdr.t_infomask2;
- tuple->t_data.header.t_hoff = xlhdr.t_hoff;
+ header->t_infomask = xlhdr.t_infomask;
+ header->t_infomask2 = xlhdr.t_infomask2;
+ header->t_hoff = xlhdr.t_hoff;
}