diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 162 |
1 files changed, 118 insertions, 44 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8653e1d8402..a1fe81b34f3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -305,6 +305,8 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +static void DisableSubscriptionAndExit(void); + /* prototype needed because of stream_commit */ static void apply_dispatch(StringInfo s); @@ -3374,6 +3376,84 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); } +/* + * Execute the initial sync with error handling. Disable the subscription, + * if it's required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) +{ + char *syncslotname; + + Assert(am_tablesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + syncslotname = LogicalRepSyncTableStart(origin_startpos); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during table synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, false); + + PG_RE_THROW(); + } + } + PG_END_TRY(); + + /* allocate slot name in long-lived context */ + *myslotname = MemoryContextStrdup(ApplyContext, syncslotname); + pfree(syncslotname); +} + +/* + * Run the apply loop with error handling. Disable the subscription, + * if necessary. + * + * Note that we don't handle FATAL errors which are probably because + * of system resource error and are not repeatable. + */ +static void +start_apply(XLogRecPtr origin_startpos) +{ + PG_TRY(); + { + LogicalRepApplyLoop(origin_startpos); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed while applying changes. Abort the + * current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -3381,8 +3461,8 @@ ApplyWorkerMain(Datum main_arg) int worker_slot = DatumGetInt32(main_arg); MemoryContext oldctx; char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos; - char *myslotname; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *myslotname = NULL; WalRcvStreamOptions options; int server_version; @@ -3477,32 +3557,7 @@ ApplyWorkerMain(Datum main_arg) if (am_tablesync_worker()) { - char *syncslotname; - - PG_TRY(); - { - /* This is table synchronization worker, call initial sync. */ - syncslotname = LogicalRepSyncTableStart(&origin_startpos); - } - PG_CATCH(); - { - /* - * Abort the current transaction so that we send the stats message - * in an idle state. - */ - AbortOutOfAnyTransaction(); - - /* Report the worker failed during table synchronization */ - pgstat_report_subscription_error(MySubscription->oid, false); - - PG_RE_THROW(); - } - PG_END_TRY(); - - /* allocate slot name in long-lived context */ - myslotname = MemoryContextStrdup(ApplyContext, syncslotname); - - pfree(syncslotname); + start_table_sync(&origin_startpos, &myslotname); /* * Allocate the origin name in long-lived context for error context @@ -3633,24 +3688,43 @@ ApplyWorkerMain(Datum main_arg) } /* Run the main loop. */ - PG_TRY(); - { - LogicalRepApplyLoop(origin_startpos); - } - PG_CATCH(); - { - /* - * Abort the current transaction so that we send the stats message in - * an idle state. - */ - AbortOutOfAnyTransaction(); + start_apply(origin_startpos); - /* Report the worker failed while applying changes */ - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + proc_exit(0); +} - PG_RE_THROW(); - } - PG_END_TRY(); +/* + * After error recovery, disable the subscription in a new transaction + * and exit cleanly. + */ +static void +DisableSubscriptionAndExit(void) +{ + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during either table synchronization or apply */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + !am_tablesync_worker()); + + /* Disable the subscription */ + StartTransactionCommand(); + DisableSubscription(MySubscription->oid); + CommitTransactionCommand(); + + /* Notify the subscription has been disabled and exit */ + ereport(LOG, + errmsg("logical replication subscription \"%s\" has been disabled due to an error", + MySubscription->name)); proc_exit(0); } |