aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/catalog/pg_subscription.c13
-rw-r--r--src/backend/commands/subscriptioncmds.c4
-rw-r--r--src/backend/replication/logical/tablesync.c11
-rw-r--r--src/include/catalog/pg_subscription_rel.h2
4 files changed, 19 insertions, 11 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ab5f3719fc3..c5b2541319e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray)
/*
* Set the state of a subscription table.
*
+ * If update_only is true and the record for given table doesn't exist, do
+ * nothing. This can be used to avoid inserting a new record that was deleted
+ * by someone else. Generally, subscription DDL commands should use false,
+ * workers should use true.
+ *
* The insert-or-update logic in this function is not concurrency safe so it
* might raise an error in rare circumstances. But if we took a stronger lock
* such as ShareRowExclusiveLock, we would risk more deadlocks.
*/
Oid
SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn)
+ XLogRecPtr sublsn, bool update_only)
{
Relation rel;
HeapTuple tup;
- Oid subrelid;
+ Oid subrelid = InvalidOid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
@@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
* If the record for given table does not exist yet create new record,
* otherwise update the existing one.
*/
- if (!HeapTupleIsValid(tup))
+ if (!HeapTupleIsValid(tup) && !update_only)
{
/* Form the tuple. */
memset(values, 0, sizeof(values));
@@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
heap_freetuple(tup);
}
- else
+ else if (HeapTupleIsValid(tup))
{
bool replaces[Natts_pg_subscription_rel];
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index ad98b38efe8..49737a90420 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -451,7 +451,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
rv->schemaname, rv->relname);
SetSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr, false);
}
ereport(NOTICE,
@@ -574,7 +574,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
SetSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr, false);
ereport(NOTICE,
(errmsg("added subscription for table %s.%s",
quote_identifier(rv->schemaname),
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6fe39d20237..f57ae6ee2d5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -287,7 +287,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ true);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
@@ -414,7 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
SetSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
- rstate->lsn);
+ rstate->lsn, true);
}
}
else
@@ -845,7 +846,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
+ MyLogicalRepWorker->relstate_lsn,
+ true);
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -932,7 +934,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
SUBREL_STATE_SYNCDONE,
- *origin_startpos);
+ *origin_startpos,
+ true);
finish_sync_worker();
}
break;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 391f96b76e4..f5f61916768 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -71,7 +71,7 @@ typedef struct SubscriptionRelState
} SubscriptionRelState;
extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn);
+ XLogRecPtr sublsn, bool update_only);
extern char GetSubscriptionRelState(Oid subid, Oid relid,
XLogRecPtr *sublsn, bool missing_ok);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);