aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c237
-rw-r--r--src/include/replication/reorderbuffer.h4
2 files changed, 214 insertions, 27 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 07eebedbac9..5cf28d4df42 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -67,6 +67,21 @@
* allocator, evicting the oldest changes would make it more likely the
* memory gets actually freed.
*
+ * We use a max-heap with transaction size as the key to efficiently find
+ * the largest transaction. While the max-heap is empty, we don't update
+ * the max-heap when updating the memory counter. Therefore, we can get
+ * the largest transaction in O(N) time, where N is the number of
+ * transactions including top-level transactions and subtransactions.
+ *
+ * We build the max-heap just before selecting the largest transactions
+ * if the number of transactions being decoded is higher than the threshold,
+ * MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
+ * update the max-heap when updating the memory counter. The intention is
+ * to efficiently find the largest transaction in O(1) time instead of
+ * incurring the cost of memory counter updates (O(log N)). Once the number
+ * of transactions got lower than the threshold, we reset the max-heap
+ * (refer to ReorderBufferMaybeResetMaxHeap() for details).
+ *
* We still rely on max_changes_in_memory when loading serialized changes
* back into memory. At that point we can't use the memory limit directly
* as we load the subxacts independently. One option to deal with this
@@ -107,6 +122,22 @@
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
+/*
+ * Threshold of the total number of top-level and sub transactions that
+ * controls whether we use the max-heap for tracking their sizes. Although
+ * using the max-heap to select the largest transaction is effective when
+ * there are many transactions being decoded, maintaining the max-heap while
+ * updating the memory statistics can be costly. Therefore, we use
+ * MaxConnections as the threshold so that we use the max-heap only when
+ * using subtransactions.
+ */
+#define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
+
+/*
+ * A macro to check if the max-heap is ready to use and needs to be updated
+ * accordingly.
+ */
+#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
@@ -259,6 +290,9 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
+static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb);
+static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb);
+static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -293,6 +327,7 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
static Size ReorderBufferChangeSize(ReorderBufferChange *change);
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change,
+ ReorderBufferTXN *txn,
bool addition, Size sz);
/*
@@ -355,6 +390,17 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0;
buffer->size = 0;
+ /*
+ * The binaryheap is indexed for faster manipulations.
+ *
+ * We allocate the initial heap size greater than
+ * MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
+ * until the threshold is exceeded.
+ */
+ buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2,
+ ReorderBufferTXNSizeCompare,
+ true, NULL);
+
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
@@ -485,7 +531,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
{
/* update memory accounting info */
if (upd_mem)
- ReorderBufferChangeMemoryUpdate(rb, change, false,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
ReorderBufferChangeSize(change));
/* free contained data */
@@ -816,7 +862,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn->nentries_mem++;
/* update memory accounting information */
- ReorderBufferChangeMemoryUpdate(rb, change, true,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change));
/* process partial change */
@@ -1527,7 +1573,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Check we're not mixing changes from different transactions. */
Assert(change->txn == txn);
- ReorderBufferReturnChange(rb, change, true);
+ ReorderBufferReturnChange(rb, change, false);
}
/*
@@ -1586,8 +1632,17 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
/* deallocate */
ReorderBufferReturnTXN(rb, txn);
+
+ /*
+ * After cleaning up one transaction, the number of transactions might get
+ * lower than the threshold for the max-heap.
+ */
+ ReorderBufferMaybeResetMaxHeap(rb);
}
/*
@@ -1637,9 +1692,12 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* remove the change from it's containing list */
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change, true);
+ ReorderBufferReturnChange(rb, change, false);
}
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
/*
* Mark the transaction as streamed.
*
@@ -3166,6 +3224,9 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
* decide if we reached the memory limit, the transaction counter allows
* us to quickly pick the largest transaction for eviction.
*
+ * Either txn or change must be non-NULL at least. We update the memory
+ * counter of txn if it's non-NULL, otherwise change->txn.
+ *
* When streaming is enabled, we need to update the toplevel transaction
* counters instead - we don't really care about subtransactions as we
* can't stream them individually anyway, and we only pick toplevel
@@ -3174,22 +3235,27 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
static void
ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change,
+ ReorderBufferTXN *txn,
bool addition, Size sz)
{
- ReorderBufferTXN *txn;
ReorderBufferTXN *toptxn;
- Assert(change->txn);
+ Assert(txn || change);
/*
* Ignore tuple CID changes, because those are not evicted when reaching
* memory limit. So we just don't count them, because it might easily
* trigger a pointless attempt to spill.
*/
- if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+ if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
return;
- txn = change->txn;
+ if (sz == 0)
+ return;
+
+ if (txn == NULL)
+ txn = change->txn;
+ Assert(txn != NULL);
/*
* Update the total size in top level as well. This is later used to
@@ -3204,6 +3270,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */
toptxn->total_size += sz;
+
+ /* Update the max-heap as well if necessary */
+ if (ReorderBufferMaxHeapIsReady(rb))
+ {
+ if ((txn->size - sz) == 0)
+ binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+ else
+ binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+ }
}
else
{
@@ -3213,6 +3288,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */
toptxn->total_size -= sz;
+
+ /* Update the max-heap as well if necessary */
+ if (ReorderBufferMaxHeapIsReady(rb))
+ {
+ if (txn->size == 0)
+ binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+ else
+ binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+ }
}
Assert(txn->size <= rb->size);
@@ -3468,34 +3552,123 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
}
}
+
+/* Compare two transactions by size */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+ ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a);
+ ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+ if (ta->size < tb->size)
+ return -1;
+ if (ta->size > tb->size)
+ return 1;
+ return 0;
+}
+
/*
- * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
+ * Build the max-heap. The heap assembly step is deferred until the end, for
+ * efficiency.
*/
-static ReorderBufferTXN *
-ReorderBufferLargestTXN(ReorderBuffer *rb)
+static void
+ReorderBufferBuildMaxHeap(ReorderBuffer *rb)
{
HASH_SEQ_STATUS hash_seq;
ReorderBufferTXNByIdEnt *ent;
- ReorderBufferTXN *largest = NULL;
+
+ Assert(binaryheap_empty(rb->txn_heap));
hash_seq_init(&hash_seq, rb->by_txn);
while ((ent = hash_seq_search(&hash_seq)) != NULL)
{
ReorderBufferTXN *txn = ent->txn;
- /* if the current transaction is larger, remember it */
- if ((!largest) || (txn->size > largest->size))
- largest = txn;
+ if (txn->size == 0)
+ continue;
+
+ binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
}
+ binaryheap_build(rb->txn_heap);
+}
+
+/*
+ * Reset the max-heap if the number of transactions got lower than the
+ * threshold.
+ */
+static void
+ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb)
+{
+ /*
+ * If we add and remove transactions right around the threshold, we could
+ * easily end up "thrashing". To avoid it, we adapt 10% of transactions to
+ * reset the max-heap.
+ */
+ if (ReorderBufferMaxHeapIsReady(rb) &&
+ binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9)
+ binaryheap_reset(rb->txn_heap);
+}
+
+/*
+ * Find the largest transaction (toplevel or subxact) to evict (spill to disk)
+ * by doing a linear search or using the max-heap depending on the number of
+ * transactions in ReorderBuffer. Refer to the comments atop this file for the
+ * algorithm details.
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTXN(ReorderBuffer *rb)
+{
+ ReorderBufferTXN *largest = NULL;
+
+ if (!ReorderBufferMaxHeapIsReady(rb))
+ {
+ /*
+ * If the number of transactions are small, we scan all transactions
+ * being decoded to get the largest transaction. This saves the cost
+ * of building a max-heap with a small number of transactions.
+ */
+ if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD)
+ {
+ HASH_SEQ_STATUS hash_seq;
+ ReorderBufferTXNByIdEnt *ent;
+
+ hash_seq_init(&hash_seq, rb->by_txn);
+ while ((ent = hash_seq_search(&hash_seq)) != NULL)
+ {
+ ReorderBufferTXN *txn = ent->txn;
+
+ /* if the current transaction is larger, remember it */
+ if ((!largest) || (txn->size > largest->size))
+ largest = txn;
+ }
+
+ Assert(largest);
+ }
+ else
+ {
+ /*
+ * There are a large number of transactions in ReorderBuffer. We
+ * build the max-heap for efficiently selecting the largest
+ * transactions.
+ */
+ ReorderBufferBuildMaxHeap(rb);
+
+ /*
+ * The max-heap is ready now. We remain the max-heap at least
+ * until we free up enough transactions to bring the total memory
+ * usage below the limit. The largest transaction is selected
+ * below.
+ */
+ Assert(ReorderBufferMaxHeapIsReady(rb));
+ }
+ }
+
+ /* Get the largest transaction from the max-heap */
+ if (ReorderBufferMaxHeapIsReady(rb))
+ largest = (ReorderBufferTXN *)
+ DatumGetPointer(binaryheap_first(rb->txn_heap));
+
Assert(largest);
Assert(largest->size > 0);
Assert(largest->size <= rb->size);
@@ -3638,6 +3811,13 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
/* We must be under the memory limit now. */
Assert(rb->size < logical_decoding_work_mem * 1024L);
+
+ /*
+ * After evicting some transactions, the number of transactions might get
+ * lower than the threshold for the max-heap.
+ */
+ ReorderBufferMaybeResetMaxHeap(rb);
+
}
/*
@@ -3705,11 +3885,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferSerializeChange(rb, txn, fd, change);
dlist_delete(&change->node);
- ReorderBufferReturnChange(rb, change, true);
+ ReorderBufferReturnChange(rb, change, false);
spilled++;
}
+ /* Update the memory counter */
+ ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
+
/* update the statistics iff we have spilled anything */
if (spilled)
{
@@ -4491,7 +4674,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
* update the accounting too (subtracting the size from the counters). And
* we don't want to underflow there.
*/
- ReorderBufferChangeMemoryUpdate(rb, change, true,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change));
}
@@ -4903,9 +5086,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
MemoryContextSwitchTo(oldcontext);
/* subtract the old change size */
- ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
/* now add the change back, with the correct size */
- ReorderBufferChangeMemoryUpdate(rb, change, true,
+ ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
ReorderBufferChangeSize(change));
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa0..a5aec01c2f0 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
#define REORDERBUFFER_H
#include "access/htup_details.h"
+#include "lib/binaryheap.h"
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "utils/hsearch.h"
@@ -631,6 +632,9 @@ struct ReorderBuffer
/* memory accounting */
Size size;
+ /* Max-heap for sizes of all top-level and sub transactions */
+ binaryheap *txn_heap;
+
/*
* Statistics about transactions spilled to disk.
*