aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMasahiko Sawada <msawada@postgresql.org>2024-08-26 11:00:04 -0700
committerMasahiko Sawada <msawada@postgresql.org>2024-08-26 11:00:04 -0700
commitdbed2e36625de9d4074243f60f48e04b5ed67810 (patch)
treea735130d50c7eb86d06a546ae0dcdb34170ddd2f
parent6749d4aabe74ca37ce351f2e318fe1b3bcf2b71c (diff)
downloadpostgresql-dbed2e36625de9d4074243f60f48e04b5ed67810.tar.gz
postgresql-dbed2e36625de9d4074243f60f48e04b5ed67810.zip
Fix memory counter update in ReorderBuffer.
Commit 5bec1d6bc5e changed the memory usage updates of the ReorderBufferTXN to zero all at once by subtracting txn->size, rather than updating it for each change. However, if TOAST reconstruction data remained in the transaction when freeing it, there were cases where it further subtracted the memory counter from zero, resulting in an assertion failure. This change calculates the memory size for each change and updates the memory usage to precisely the amount that has been freed. Backpatch to v17, where this was introducd. Reviewed-by: Amit Kapila, Shlok Kyal Discussion: https://postgr.es/m/CAD21AoAqkNUvicgKPT_dXzNoOwpPkVTg0QPPxEcWmzT0moCJ1g%40mail.gmail.com Backpatch-through: 17
-rw-r--r--contrib/test_decoding/expected/stream.out19
-rw-r--r--contrib/test_decoding/sql/stream.sql15
-rw-r--r--src/backend/replication/logical/reorderbuffer.c32
3 files changed, 62 insertions, 4 deletions
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 4ab2d47bf8d..a76f77601e2 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -109,6 +109,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
committing streamed transaction
(17 rows)
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ count
+-------
+ 5
+(1 row)
+
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
index 4feec62972a..7f43f0c2ab7 100644
--- a/contrib/test_decoding/sql/stream.sql
+++ b/contrib/test_decoding/sql/stream.sql
@@ -44,5 +44,20 @@ toasted-123456789012345678901234567890123456789012345678901234567890123456789012
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 00a8327e771..b3139c41e2b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Reset the toast hash */
ReorderBufferToastReset(rb, txn);
+ /* All changes must be deallocated */
+ Assert(txn->size == 0);
+
pfree(txn);
}
@@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
bool found;
dlist_mutable_iter iter;
+ Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
@@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
+ /*
+ * Instead of updating the memory counter for individual changes,
+ * we sum up the size of memory to free so we can update the memory
+ * counter all together below. This saves costs of maintaining
+ * the max-heap.
+ */
+ mem_freed += ReorderBufferChangeSize(change);
+
ReorderBufferReturnChange(rb, change, false);
}
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
+
/*
* Cleanup the tuplecids we stored for decoding catalog snapshot access.
* They are always stored in the toplevel transaction.
@@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
- /* Update the memory counter */
- ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
-
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
}
@@ -1616,6 +1628,7 @@ static void
ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
+ Size mem_freed = 0;
/* cleanup subtransactions & their changes */
dlist_foreach_modify(iter, &txn->subtxns)
@@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* remove the change from it's containing list */
dlist_delete(&change->node);
+ /*
+ * Instead of updating the memory counter for individual changes,
+ * we sum up the size of memory to free so we can update the memory
+ * counter all together below. This saves costs of maintaining
+ * the max-heap.
+ */
+ mem_freed += ReorderBufferChangeSize(change);
+
ReorderBufferReturnChange(rb, change, false);
}
/* Update the memory counter */
- ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
/*
* Mark the transaction as streamed.
@@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
rb->stream_stop(rb, txn, last_lsn);
ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
}
+
+ /* All changes must be deallocated */
+ Assert(txn->size == 0);
}
/*