aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c162
1 files changed, 118 insertions, 44 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8653e1d8402..a1fe81b34f3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -305,6 +305,8 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void maybe_reread_subscription(void);
+static void DisableSubscriptionAndExit(void);
+
/* prototype needed because of stream_commit */
static void apply_dispatch(StringInfo s);
@@ -3374,6 +3376,84 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+ char *syncslotname;
+
+ Assert(am_tablesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during table synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+
+ /* allocate slot name in long-lived context */
+ *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+ pfree(syncslotname);
+}
+
+/*
+ * Run the apply loop with error handling. Disable the subscription,
+ * if necessary.
+ *
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
+ */
+static void
+start_apply(XLogRecPtr origin_startpos)
+{
+ PG_TRY();
+ {
+ LogicalRepApplyLoop(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed while applying changes. Abort the
+ * current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
@@ -3381,8 +3461,8 @@ ApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos;
- char *myslotname;
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *myslotname = NULL;
WalRcvStreamOptions options;
int server_version;
@@ -3477,32 +3557,7 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
{
- char *syncslotname;
-
- PG_TRY();
- {
- /* This is table synchronization worker, call initial sync. */
- syncslotname = LogicalRepSyncTableStart(&origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message
- * in an idle state.
- */
- AbortOutOfAnyTransaction();
-
- /* Report the worker failed during table synchronization */
- pgstat_report_subscription_error(MySubscription->oid, false);
-
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- /* allocate slot name in long-lived context */
- myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
-
- pfree(syncslotname);
+ start_table_sync(&origin_startpos, &myslotname);
/*
* Allocate the origin name in long-lived context for error context
@@ -3633,24 +3688,43 @@ ApplyWorkerMain(Datum main_arg)
}
/* Run the main loop. */
- PG_TRY();
- {
- LogicalRepApplyLoop(origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message in
- * an idle state.
- */
- AbortOutOfAnyTransaction();
+ start_apply(origin_startpos);
- /* Report the worker failed while applying changes */
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ proc_exit(0);
+}
- PG_RE_THROW();
- }
- PG_END_TRY();
+/*
+ * After error recovery, disable the subscription in a new transaction
+ * and exit cleanly.
+ */
+static void
+DisableSubscriptionAndExit(void)
+{
+ /*
+ * Emit the error message, and recover from the error state to an idle
+ * state
+ */
+ HOLD_INTERRUPTS();
+
+ EmitErrorReport();
+ AbortOutOfAnyTransaction();
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ /* Report the worker failed during either table synchronization or apply */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ !am_tablesync_worker());
+
+ /* Disable the subscription */
+ StartTransactionCommand();
+ DisableSubscription(MySubscription->oid);
+ CommitTransactionCommand();
+
+ /* Notify the subscription has been disabled and exit */
+ ereport(LOG,
+ errmsg("logical replication subscription \"%s\" has been disabled due to an error",
+ MySubscription->name));
proc_exit(0);
}