aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/stream.out19
-rw-r--r--contrib/test_decoding/sql/stream.sql15
-rw-r--r--src/backend/replication/logical/reorderbuffer.c32
3 files changed, 62 insertions, 4 deletions
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 4ab2d47bf8d..a76f77601e2 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -109,6 +109,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
committing streamed transaction
(17 rows)
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ count
+-------
+ 5
+(1 row)
+
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index 4feec62972a..7f43f0c2ab7 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -44,5 +44,20 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 00a8327e771..b3139c41e2b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
+ /* All changes must be deallocated */
+ Assert(txn->size == 0);
+
pfree(txn);
}
@@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
bool found;
dlist_mutable_iter iter;
+ Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
@@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
+ /*
+ * Instead of updating the memory counter for individual changes,
+ * we sum up the size of memory to free so we can update the memory
+ * counter all together below. This saves costs of maintaining
+ * the max-heap.
+ */
+ mem_freed += ReorderBufferChangeSize(change);
+
ReorderBufferReturnChange(rb, change, false);
}
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
+
/*
* Cleanup the tuplecids we stored for decoding catalog snapshot access.
* They are always stored in the toplevel transaction.
@@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
- /* Update the memory counter */
- ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
-
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
}
@@ -1616,6 +1628,7 @@ static void
ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
+ Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
@@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* remove the change from it's containing list */
dlist_delete(&change->node);
+ /*
+ * Instead of updating the memory counter for individual changes,
+ * we sum up the size of memory to free so we can update the memory
+ * counter all together below. This saves costs of maintaining
+ * the max-heap.
+ */
+ mem_freed += ReorderBufferChangeSize(change);
+
ReorderBufferReturnChange(rb, change, false);
}
/* Update the memory counter */
- ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
/*
* Mark the transaction as streamed.
@@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
rb->stream_stop(rb, txn, last_lsn);
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
}
+
+ /* All changes must be deallocated */
+ Assert(txn->size == 0);
}
/*