aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c6
-rw-r--r--src/backend/commands/alter.c4
-rw-r--r--src/backend/commands/subscriptioncmds.c6
-rw-r--r--src/backend/replication/logical/worker.c52
-rw-r--r--src/include/replication/logicalworker.h4
5 files changed, 72 insertions, 0 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7f..8daa7f7d446 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
AtEOXact_PgStat(true, is_parallel_worker);
AtEOXact_Snapshot(true, false);
AtEOXact_ApplyLauncher(true);
+ AtEOXact_LogicalRepWorkers(true);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@@ -2647,6 +2649,9 @@ PrepareTransaction(void)
AtEOXact_HashTables(true);
/* don't call AtEOXact_PgStat here; we fixed pgstat state above */
AtEOXact_Snapshot(true, true);
+ /* we treat PREPARE as ROLLBACK so far as waking workers goes */
+ AtEOXact_ApplyLauncher(false);
+ AtEOXact_LogicalRepWorkers(false);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@@ -2860,6 +2865,7 @@ AbortTransaction(void)
AtEOXact_HashTables(false);
AtEOXact_PgStat(false, is_parallel_worker);
AtEOXact_ApplyLauncher(false);
+ AtEOXact_LogicalRepWorkers(false);
pgstat_report_xact_timestamp(0);
}
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a7..bea51b3af1f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
#include "commands/user.h"
#include "miscadmin.h"
#include "parser/parse_func.h"
+#include "replication/logicalworker.h"
#include "rewrite/rewriteDefine.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
@@ -279,6 +280,9 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
if (strncmp(new_name, "regress_", 8) != 0)
elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
#endif
+
+ /* Wake up related replication workers to handle this change quickly */
+ LogicalRepWorkersWakeupAtCommit(objectId);
}
else if (nameCacheId >= 0)
{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796fc..f15a332bae3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
+ /* Wake up related replication workers to handle this change quickly. */
+ LogicalRepWorkersWakeupAtCommit(subid);
+
return myself;
}
@@ -1732,7 +1736,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
InvokeObjectPostAlterHook(SubscriptionRelationId,
form->oid, 0);
+ /* Wake up related background processes to handle this change quickly. */
ApplyLauncherWakeupAtCommit();
+ LogicalRepWorkersWakeupAtCommit(form->oid);
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f8e8cf71eb8..f8649e142c3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
static bool MySubscriptionValid = false;
+static List *on_commit_wakeup_workers_subids = NIL;
+
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
@@ -4092,3 +4094,53 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.remote_attnum = -1;
set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
+
+/*
+ * Request wakeup of the workers for the given subscription OID
+ * at commit of the current transaction.
+ *
+ * This is used to ensure that the workers process assorted changes
+ * as soon as possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+ MemoryContext oldcxt;
+
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+ on_commit_wakeup_workers_subids =
+ list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Wake up the workers of any subscriptions that were changed in this xact.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+ if (isCommit && on_commit_wakeup_workers_subids != NIL)
+ {
+ ListCell *lc;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ foreach(lc, on_commit_wakeup_workers_subids)
+ {
+ Oid subid = lfirst_oid(lc);
+ List *workers;
+ ListCell *lc2;
+
+ workers = logicalrep_workers_find(subid, true);
+ foreach(lc2, workers)
+ {
+ LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
+
+ logicalrep_worker_wakeup_ptr(worker);
+ }
+ }
+ LWLockRelease(LogicalRepWorkerLock);
+ }
+
+ /* The List storage will be reclaimed automatically in xact cleanup. */
+ on_commit_wakeup_workers_subids = NIL;
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index f1e7e8a3484..e484662b723 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,8 @@ extern void ApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
#endif /* LOGICALWORKER_H */