aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2022-03-14 09:32:40 +0530
committerAmit Kapila <akapila@postgresql.org>2022-03-14 09:32:40 +0530
commit705e20f8550c0e8e47c0b6b20b5f5ffd6ffd9e33 (patch)
treed6606b7864bdcc3439caaeccfc0dc3a31a9386e3 /src/backend
parent369398ed886dc13956654777467536625e6fc7ee (diff)
downloadpostgresql-705e20f8550c0e8e47c0b6b20b5f5ffd6ffd9e33.tar.gz
postgresql-705e20f8550c0e8e47c0b6b20b5f5ffd6ffd9e33.zip
Optionally disable subscriptions on error.
Logical replication apply workers for a subscription can easily get stuck in an infinite loop of attempting to apply a change, triggering an error (such as a constraint violation), exiting with the error written to the subscription server log, and restarting. To partially remedy the situation, this patch adds a new subscription option named 'disable_on_error'. To be consistent with old behavior, this option defaults to false. When true, both the tablesync worker and apply worker catch any errors thrown and disable the subscription in order to break the loop. The error is still also written in the logs. Once the subscription is disabled, users can either manually resolve the conflict/error or skip the conflicting transaction by using pg_replication_origin_advance() function. After resolving the conflict, users need to enable the subscription to allow apply process to proceed. Author: Osumi Takamichi and Mark Dilger Reviewed-by: Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying, Peter Smith, Masahiko Sawada, Shi Yu Discussion : https://postgr.es/m/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_subscription.c40
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/commands/subscriptioncmds.c27
-rw-r--r--src/backend/replication/logical/worker.c162
4 files changed, 185 insertions, 47 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8bd201..a6304f5f81a 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->binary = subform->subbinary;
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
+ sub->disableonerr = subform->subdisableonerr;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -157,6 +158,45 @@ FreeSubscription(Subscription *sub)
}
/*
+ * Disable the given subscription.
+ */
+void
+DisableSubscription(Oid subid)
+{
+ Relation rel;
+ bool nulls[Natts_pg_subscription];
+ bool replaces[Natts_pg_subscription];
+ Datum values[Natts_pg_subscription];
+ HeapTuple tup;
+
+ /* Look up the subscription in the catalog */
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ /* Form a new tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ /* Set the subscription to disabled. */
+ values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+ replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+ /* Update the catalog */
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+ heap_freetuple(tup);
+
+ table_close(rel, NoLock);
+}
+
+/*
* get_subscription_oid - given a subscription name, look up the OID
*
* If missing_ok is false, throw an error if name not found. If true, just
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca5a96..bb1ac30cd19 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
- substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+ substream, subtwophasestate, subdisableonerr, subslotname,
+ subsynccommit, subpublications)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d246..3922658bbca 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
#define SUBOPT_BINARY 0x00000080
#define SUBOPT_STREAMING 0x00000100
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
+#define SUBOPT_DISABLE_ON_ERR 0x00000400
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
bool binary;
bool streaming;
bool twophase;
+ bool disableonerr;
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->streaming = false;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
+ if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+ opts->disableonerr = false;
/* Parse options */
foreach(lc, stmt_options)
@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
opts->twophase = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+ strcmp(defel->defname, "disable_on_error") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+ opts->disableonerr = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED);
+ values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING);
+ SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_substream - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+ {
+ values[Anum_pg_subscription_subdisableonerr - 1]
+ = BoolGetDatum(opts.disableonerr);
+ replaces[Anum_pg_subscription_subdisableonerr - 1]
+ = true;
+ }
+
update_tuple = true;
break;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8653e1d8402..a1fe81b34f3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -305,6 +305,8 @@ static void store_flush_position(XLogRecPtr remote_lsn);
static void maybe_reread_subscription(void);
+static void DisableSubscriptionAndExit(void);
+
/* prototype needed because of stream_commit */
static void apply_dispatch(StringInfo s);
@@ -3374,6 +3376,84 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
}
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ *
+ * Allocate the slot name in long-lived context on return. Note that we don't
+ * handle FATAL errors which are probably because of system resource error and
+ * are not repeatable.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+ char *syncslotname;
+
+ Assert(am_tablesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during table synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, false);
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+
+ /* allocate slot name in long-lived context */
+ *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+ pfree(syncslotname);
+}
+
+/*
+ * Run the apply loop with error handling. Disable the subscription,
+ * if necessary.
+ *
+ * Note that we don't handle FATAL errors which are probably because
+ * of system resource error and are not repeatable.
+ */
+static void
+start_apply(XLogRecPtr origin_startpos)
+{
+ PG_TRY();
+ {
+ LogicalRepApplyLoop(origin_startpos);
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed while applying changes. Abort the
+ * current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
@@ -3381,8 +3461,8 @@ ApplyWorkerMain(Datum main_arg)
int worker_slot = DatumGetInt32(main_arg);
MemoryContext oldctx;
char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos;
- char *myslotname;
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char *myslotname = NULL;
WalRcvStreamOptions options;
int server_version;
@@ -3477,32 +3557,7 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
{
- char *syncslotname;
-
- PG_TRY();
- {
- /* This is table synchronization worker, call initial sync. */
- syncslotname = LogicalRepSyncTableStart(&origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message
- * in an idle state.
- */
- AbortOutOfAnyTransaction();
-
- /* Report the worker failed during table synchronization */
- pgstat_report_subscription_error(MySubscription->oid, false);
-
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- /* allocate slot name in long-lived context */
- myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
-
- pfree(syncslotname);
+ start_table_sync(&origin_startpos, &myslotname);
/*
* Allocate the origin name in long-lived context for error context
@@ -3633,24 +3688,43 @@ ApplyWorkerMain(Datum main_arg)
}
/* Run the main loop. */
- PG_TRY();
- {
- LogicalRepApplyLoop(origin_startpos);
- }
- PG_CATCH();
- {
- /*
- * Abort the current transaction so that we send the stats message in
- * an idle state.
- */
- AbortOutOfAnyTransaction();
+ start_apply(origin_startpos);
- /* Report the worker failed while applying changes */
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ proc_exit(0);
+}
- PG_RE_THROW();
- }
- PG_END_TRY();
+/*
+ * After error recovery, disable the subscription in a new transaction
+ * and exit cleanly.
+ */
+static void
+DisableSubscriptionAndExit(void)
+{
+ /*
+ * Emit the error message, and recover from the error state to an idle
+ * state
+ */
+ HOLD_INTERRUPTS();
+
+ EmitErrorReport();
+ AbortOutOfAnyTransaction();
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ /* Report the worker failed during either table synchronization or apply */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ !am_tablesync_worker());
+
+ /* Disable the subscription */
+ StartTransactionCommand();
+ DisableSubscription(MySubscription->oid);
+ CommitTransactionCommand();
+
+ /* Notify the subscription has been disabled and exit */
+ ereport(LOG,
+ errmsg("logical replication subscription \"%s\" has been disabled due to an error",
+ MySubscription->name));
proc_exit(0);
}