aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/toast.out58
-rw-r--r--contrib/test_decoding/sql/toast.sql37
-rw-r--r--src/backend/replication/logical/decode.c82
-rw-r--r--src/backend/replication/logical/reorderbuffer.c116
-rw-r--r--src/include/replication/reorderbuffer.h15
5 files changed, 236 insertions, 72 deletions
diff --git a/contrib/test_decoding/expected/toast.out b/contrib/test_decoding/expected/toast.out
index 0a850b7acdb..b7bae65ee82 100644
--- a/contrib/test_decoding/expected/toast.out
+++ b/contrib/test_decoding/expected/toast.out
@@ -285,6 +285,64 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot',
COMMIT
(232 rows)
+-- test we can decode "old" tuples bigger than the max heap tuple size correctly
+DROP TABLE IF EXISTS toasted_several;
+NOTICE: table "toasted_several" does not exist, skipping
+CREATE TABLE toasted_several (
+ id serial unique not null,
+ toasted_key text primary key,
+ toasted_col1 text,
+ toasted_col2 text
+);
+ALTER TABLE toasted_several REPLICA IDENTITY FULL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
+INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ regexp_replace
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
+ COMMIT
+(3 rows)
+
+-- test update of a toasted key without changing it
+UPDATE toasted_several SET toasted_col1 = toasted_key;
+UPDATE toasted_several SET toasted_col2 = toasted_col1;
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ regexp_replace
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
+ COMMIT
+ BEGIN
+ table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..432109876543210987654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:null
+ COMMIT
+ BEGIN
+ table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
+ COMMIT
+(9 rows)
+
+/*
+ * update with large tuplebuf, in a transaction large enough to force to spool to disk
+ */
+BEGIN;
+INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
+UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
+DELETE FROM toasted_several WHERE id = 1;
+COMMIT;
+DROP TABLE toasted_several;
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
+WHERE data NOT LIKE '%INSERT: %';
+ regexp_replace
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..7654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:unchanged-toast-datum
+ table public.toasted_several: DELETE: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
+ COMMIT
+(4 rows)
+
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--------------------------
diff --git a/contrib/test_decoding/sql/toast.sql b/contrib/test_decoding/sql/toast.sql
index 09293865df9..a333d99abce 100644
--- a/contrib/test_decoding/sql/toast.sql
+++ b/contrib/test_decoding/sql/toast.sql
@@ -260,4 +260,41 @@ ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL;
203 untoasted200
\.
SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test we can decode "old" tuples bigger than the max heap tuple size correctly
+DROP TABLE IF EXISTS toasted_several;
+CREATE TABLE toasted_several (
+ id serial unique not null,
+ toasted_key text primary key,
+ toasted_col1 text,
+ toasted_col2 text
+);
+ALTER TABLE toasted_several REPLICA IDENTITY FULL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
+
+INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test update of a toasted key without changing it
+UPDATE toasted_several SET toasted_col1 = toasted_key;
+UPDATE toasted_several SET toasted_col2 = toasted_col1;
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+/*
+ * update with large tuplebuf, in a transaction large enough to force to spool to disk
+ */
+BEGIN;
+INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
+UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
+DELETE FROM toasted_several WHERE id = 1;
+COMMIT;
+
+DROP TABLE toasted_several;
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
+WHERE data NOT LIKE '%INSERT: %';
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9610b6ce773..0f7ff1501be 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -625,13 +625,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
+ Size tuplelen = r->xl_len - SizeOfHeapInsert;
+
Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
- r->xl_len - SizeOfHeapInsert,
- change->data.tp.newtuple);
+ tuplelen, change->data.tp.newtuple);
}
change->data.tp.clear_toast_afterwards = true;
@@ -650,7 +652,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogRecord *r = &buf->record;
xl_heap_update *xlrec;
- xl_heap_header_len xlhdr;
ReorderBufferChange *change;
char *data;
@@ -669,16 +670,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
+ Size tuplelen;
+ xl_heap_header_len xlhdr;
+
Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
memcpy(&xlhdr, data, sizeof(xlhdr));
data += offsetof(xl_heap_header_len, header);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ tuplelen = xlhdr.t_len + SizeOfHeapHeader;
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.newtuple);
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(data, tuplelen, change->data.tp.newtuple);
/* skip over the rest of the tuple header */
data += SizeOfHeapHeader;
/* skip over the tuple data */
@@ -687,14 +692,18 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
+ Size tuplelen;
+ xl_heap_header_len xlhdr;
+
memcpy(&xlhdr, data, sizeof(xlhdr));
data += offsetof(xl_heap_header_len, header);
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ tuplelen = xlhdr.t_len + SizeOfHeapHeader;
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.oldtuple);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+
+ DecodeXLogTuple(data, tuplelen, change->data.tp.oldtuple);
#ifdef NOT_USED
data += SizeOfHeapHeader;
data += xlhdr.t_len;
@@ -732,13 +741,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* old primary key stored */
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
+ Size len = r->xl_len - SizeOfHeapDelete;
+
Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, len);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- r->xl_len - SizeOfHeapDelete,
- change->data.tp.oldtuple);
+ len, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
@@ -795,37 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
+ HeapTupleHeader header;
+
+ xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
+ data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+ datalen = xlhdr->datalen;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
- xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
- data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
- datalen = xlhdr->datalen;
-
/*
* We can only figure this out after reassembling the
* transactions.
*/
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->header;
+
tuple->tuple.t_len = datalen
+ offsetof(HeapTupleHeaderData, t_bits);
- memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+ memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
- memcpy((char *) &tuple->header
- + offsetof(HeapTupleHeaderData, t_bits),
+ memcpy((char *) tuple->tuple.t_data + offsetof(HeapTupleHeaderData, t_bits),
(char *) data,
datalen);
data += datalen;
- tuple->header.t_infomask = xlhdr->t_infomask;
- tuple->header.t_infomask2 = xlhdr->t_infomask2;
- tuple->header.t_hoff = xlhdr->t_hoff;
+ header->t_infomask = xlhdr->t_infomask;
+ header->t_infomask2 = xlhdr->t_infomask2;
+ header->t_hoff = xlhdr->t_hoff;
}
/*
@@ -856,31 +870,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
+ HeapTupleHeader header;
Assert(datalen >= 0);
- Assert(datalen <= MaxHeapTupleSize);
tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+ header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid;
- tuple->tuple.t_data = &tuple->header;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
data,
SizeOfHeapHeader);
- memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+ memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
- memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+ memcpy(((char *) tuple->tuple.t_data) + offsetof(HeapTupleHeaderData, t_bits),
data + SizeOfHeapHeader,
datalen);
- tuple->header.t_infomask = xlhdr.t_infomask;
- tuple->header.t_infomask2 = xlhdr.t_infomask2;
- tuple->header.t_hoff = xlhdr.t_hoff;
+ header->t_infomask = xlhdr.t_infomask;
+ header->t_infomask2 = xlhdr.t_infomask2;
+ header->t_hoff = xlhdr.t_hoff;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 22b83ab6ad0..a1479fe949f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -442,27 +442,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
/*
- * Get a unused, possibly preallocated, ReorderBufferTupleBuf
+ * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
+ * least a tuple of size tuple_len (excluding header overhead).
*/
ReorderBufferTupleBuf *
-ReorderBufferGetTupleBuf(ReorderBuffer *rb)
+ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{
ReorderBufferTupleBuf *tuple;
+ Size alloc_len;
- /* check the slab cache */
- if (rb->nr_cached_tuplebufs)
+ alloc_len = tuple_len + offsetof(HeapTupleHeaderData, t_bits);
+
+ /*
+ * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
+ * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
+ * tuples generated for oldtuples can be bigger, as they don't have
+ * out-of-line toast columns.
+ */
+ if (alloc_len < MaxHeapTupleSize)
+ alloc_len = MaxHeapTupleSize;
+
+
+ /* if small enough, check the slab cache */
+ if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
{
rb->nr_cached_tuplebufs--;
tuple = slist_container(ReorderBufferTupleBuf, node,
slist_pop_head_node(&rb->cached_tuplebufs));
#ifdef USE_ASSERT_CHECKING
- memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf));
+ memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
+#endif
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
+#ifdef USE_ASSERT_CHECKING
+ memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
#endif
}
else
{
tuple = (ReorderBufferTupleBuf *)
- MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf));
+ MemoryContextAlloc(rb->context,
+ sizeof(ReorderBufferTupleBuf) + alloc_len);
+ tuple->alloc_tuple_size = alloc_len;
+ tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
}
return tuple;
@@ -477,13 +498,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb)
void
ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
{
- /* check whether to put into the slab cache */
- if (rb->nr_cached_tuplebufs < max_cached_tuplebufs)
+ /* check whether to put into the slab cache, oversized tuples never are */
+ if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
+ rb->nr_cached_tuplebufs < max_cached_tuplebufs)
{
rb->nr_cached_tuplebufs++;
slist_push_head(&rb->cached_tuplebufs, &tuple->node);
+ VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
+ VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
}
else
{
@@ -2020,17 +2044,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
newtup = change->data.tp.newtuple;
if (oldtup)
- oldlen = offsetof(ReorderBufferTupleBuf, data)
- +oldtup->tuple.t_len
- - offsetof(HeapTupleHeaderData, t_bits);
+ {
+ sz += sizeof(HeapTupleData);
+ oldlen = oldtup->tuple.t_len;
+ sz += oldlen;
+ }
if (newtup)
- newlen = offsetof(ReorderBufferTupleBuf, data)
- +newtup->tuple.t_len
- - offsetof(HeapTupleHeaderData, t_bits);
-
- sz += oldlen;
- sz += newlen;
+ {
+ sz += sizeof(HeapTupleData);
+ newlen = newtup->tuple.t_len;
+ sz += newlen;
+ }
/* make sure we have enough space */
ReorderBufferSerializeReserve(rb, sz);
@@ -2041,13 +2066,19 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldlen)
{
- memcpy(data, oldtup, oldlen);
+ memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, oldtup->tuple.t_data, oldlen);
data += oldlen;
}
if (newlen)
{
- memcpy(data, newtup, newlen);
+ memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, newtup->tuple.t_data, newlen);
data += newlen;
}
break;
@@ -2268,29 +2299,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, data)
- +((ReorderBufferTupleBuf *) data)->tuple.t_len
- - offsetof(HeapTupleHeaderData, t_bits);
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.oldtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - offsetof(HeapTupleHeaderData, t_bits));
+
+ /* restore ->tuple */
+ memcpy(&change->data.tp.oldtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
- change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.oldtuple, data, len);
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.oldtuple->tuple.t_data =
- &change->data.tp.oldtuple->header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.oldtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
if (change->data.tp.newtuple)
{
- Size len = offsetof(ReorderBufferTupleBuf, data)
- +((ReorderBufferTupleBuf *) data)->tuple.t_len
- - offsetof(HeapTupleHeaderData, t_bits);
+ Size tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.tp.newtuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - offsetof(HeapTupleHeaderData, t_bits));
+
+ /* restore ->tuple */
+ memcpy(&change->data.tp.newtuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
- change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb);
- memcpy(change->data.tp.newtuple, data, len);
+ /* reset t_data pointer into the new tuplebuf */
change->data.tp.newtuple->tuple.t_data =
- &change->data.tp.newtuple->header;
- data += len;
+ ReorderBufferTupleBufData(change->data.tp.newtuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
}
+
break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
@@ -2667,7 +2715,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
tmphtup = heap_form_tuple(desc, attrs, isnull);
Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
- Assert(&newtup->header == newtup->tuple.t_data);
+ Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
newtup->tuple.t_len = tmphtup->t_len;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 00183fa16d6..9092c9b4ff6 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -26,12 +26,19 @@ typedef struct ReorderBufferTupleBuf
/* position in preallocated list */
slist_node node;
- /* tuple, stored sequentially */
+ /* tuple header, the interesting bit for users of logical decoding */
HeapTupleData tuple;
- HeapTupleHeaderData header;
- char data[MaxHeapTupleSize];
+
+ /* pre-allocated size of tuple buffer, different from tuple size */
+ Size alloc_tuple_size;
+
+ /* actual tuple data follows */
} ReorderBufferTupleBuf;
+/* pointer to the data stored in a TupleBuf */
+#define ReorderBufferTupleBufData(p) \
+ ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
+
/*
* Types of the change passed to a 'change' callback.
*
@@ -327,7 +334,7 @@ struct ReorderBuffer
ReorderBuffer *ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer *);
-ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *);
+ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);