aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2017-05-09 14:40:42 -0400
committerPeter Eisentraut <peter_e@gmx.net>2017-05-09 14:51:49 -0400
commit489b96e80b96c0eda02575347654e87968f2f5f4 (patch)
tree870433737c38221ed5676574780c2cde64342a35 /src
parente0bf16060be695ced920727fa29f0d9ede61bd3f (diff)
downloadpostgresql-489b96e80b96c0eda02575347654e87968f2f5f4.tar.gz
postgresql-489b96e80b96c0eda02575347654e87968f2f5f4.zip
Improve memory use in logical replication apply
Previously, the memory used by the logical replication apply worker for processing messages would never be freed, so that could end up using a lot of memory. To improve that, change the existing ApplyContext memory context to ApplyMessageContext and reset that after every message (similar to MessageContext used elsewhere). For consistency of naming, rename the ApplyCacheContext to ApplyContext. Author: Stas Kelvich <s.kelvich@postgrespro.ru>
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/worker.c51
-rw-r--r--src/backend/utils/mmgr/README11
-rw-r--r--src/include/replication/worker_internal.h4
3 files changed, 40 insertions, 26 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 362de12457b..04813b506e1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
int attnum;
} SlotErrCallbackArg;
-static MemoryContext ApplyContext = NULL;
-MemoryContext ApplyCacheContext = NULL;
+static MemoryContext ApplyMessageContext = NULL;
+MemoryContext ApplyContext = NULL;
WalReceiverConn *wrconn = NULL;
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
/*
* Make sure that we started local transaction.
*
- * Also switches to ApplyContext as necessary.
+ * Also switches to ApplyMessageContext as necessary.
*/
static bool
ensure_transaction(void)
{
if (IsTransactionState())
{
- if (CurrentMemoryContext != ApplyContext)
- MemoryContextSwitchTo(ApplyContext);
+ if (CurrentMemoryContext != ApplyMessageContext)
+ MemoryContextSwitchTo(ApplyMessageContext);
+
return false;
}
@@ -162,7 +163,7 @@ ensure_transaction(void)
if (!MySubscriptionValid)
reread_subscription();
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
return true;
}
@@ -961,7 +962,7 @@ store_flush_position(XLogRecPtr remote_lsn)
FlushPosition *flushpos;
/* Need to do this in permanent context */
- MemoryContextSwitchTo(ApplyCacheContext);
+ MemoryContextSwitchTo(ApplyContext);
/* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
@@ -969,7 +970,7 @@ store_flush_position(XLogRecPtr remote_lsn)
flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node);
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
}
@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
static void
LogicalRepApplyLoop(XLogRecPtr last_received)
{
- /* Init the ApplyContext which we use for easier cleanup. */
- ApplyContext = AllocSetContextCreate(TopMemoryContext,
- "ApplyContext",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ /*
+ * Init the ApplyMessageContext which we clean up after each
+ * replication protocol message.
+ */
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
ping_sent = false;
/* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
s.data = buf;
s.len = len;
@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
/* other message types are purposefully ignored */
+
+ MemoryContextReset(ApplyMessageContext);
}
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* Cleanup the memory. */
- MemoryContextResetAndDeleteChildren(ApplyContext);
+ MemoryContextResetAndDeleteChildren(ApplyMessageContext);
MemoryContextSwitchTo(TopMemoryContext);
/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
if (!reply_message)
{
- MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
reply_message = makeStringInfo();
MemoryContextSwitchTo(oldctx);
}
@@ -1308,7 +1312,7 @@ reread_subscription(void)
}
/* Ensure allocations in permanent context. */
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
MyLogicalRepWorker->userid);
/* Load the subscription into persistent memory context. */
- CreateCacheMemoryContext();
- ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
- "ApplyCacheContext",
+ ApplyContext = AllocSetContextCreate(TopMemoryContext,
+ "ApplyContext",
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
/* The slot name needs to be allocated in permanent memory context. */
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/utils/mmgr/README b/src/backend/utils/mmgr/README
index 480b1f89d02..387c337985f 100644
--- a/src/backend/utils/mmgr/README
+++ b/src/backend/utils/mmgr/README
@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
and don't actually need any storage allocated in their private contexts.
+Logical Replication Worker Contexts
+-----------------------------------
+
+ApplyContext --- permanent during whole lifetime of apply worker. It
+is possible to use TopMemoryContext here as well, but for simplicity
+of memory usage analysis we spin up different context.
+
+ApplyMessageContext --- short-lived context that is reset after each
+logical replication protocol message is processed.
+
+
Transient Contexts During Execution
-----------------------------------
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f6fee102b2a..26788fec5c1 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -56,8 +56,8 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;
-/* Memory context for cached variables in apply worker. */
-extern MemoryContext ApplyCacheContext;
+/* Main memory context for apply worker. Permanent during worker lifetime. */
+extern MemoryContext ApplyContext;
/* libpqreceiver connection */
extern struct WalReceiverConn *wrconn;