aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/commands/subscriptioncmds.c38
-rw-r--r--src/backend/tcop/utility.c5
2 files changed, 32 insertions, 11 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0e081388c47..0036d99c2e0 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -18,6 +18,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
+#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/objectaccess.h"
@@ -204,7 +205,7 @@ publicationListToArray(List *publist)
* Create new subscription.
*/
ObjectAddress
-CreateSubscription(CreateSubscriptionStmt *stmt)
+CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -221,6 +222,23 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
bool create_slot;
List *publications;
+ /*
+ * Parse and check options.
+ * Connection and publication should not be specified here.
+ */
+ parse_subscription_options(stmt->options, NULL, NULL,
+ &enabled_given, &enabled,
+ &create_slot, &slotname);
+
+ /*
+ * Since creating a replication slot is not transactional, rolling back
+ * the transaction leaves the created replication slot. So we cannot run
+ * CREATE SUBSCRIPTION inside a transaction block if creating a
+ * replication slot.
+ */
+ if (create_slot)
+ PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... CREATE SLOT");
+
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
@@ -239,13 +257,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
stmt->subname)));
}
- /*
- * Parse and check options.
- * Connection and publication should not be specified here.
- */
- parse_subscription_options(stmt->options, NULL, NULL,
- &enabled_given, &enabled,
- &create_slot, &slotname);
if (slotname == NULL)
slotname = stmt->subname;
@@ -424,7 +435,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
* Drop a subscription
*/
void
-DropSubscription(DropSubscriptionStmt *stmt)
+DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -441,6 +452,15 @@ DropSubscription(DropSubscriptionStmt *stmt)
WalReceiverConn *wrconn = NULL;
StringInfoData cmd;
+ /*
+ * Since dropping a replication slot is not transactional, the replication
+ * slot stays dropped even if the transaction rolls back. So we cannot
+ * run DROP SUBSCRIPTION inside a transaction block if dropping the
+ * replication slot.
+ */
+ if (stmt->drop_slot)
+ PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
+
rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 3bc0ae5e7e6..20b52734054 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1609,7 +1609,8 @@ ProcessUtilitySlow(ParseState *pstate,
break;
case T_CreateSubscriptionStmt:
- address = CreateSubscription((CreateSubscriptionStmt *) parsetree);
+ address = CreateSubscription((CreateSubscriptionStmt *) parsetree,
+ isTopLevel);
break;
case T_AlterSubscriptionStmt:
@@ -1617,7 +1618,7 @@ ProcessUtilitySlow(ParseState *pstate,
break;
case T_DropSubscriptionStmt:
- DropSubscription((DropSubscriptionStmt *) parsetree);
+ DropSubscription((DropSubscriptionStmt *) parsetree, isTopLevel);
/* no commands stashed for DROP */
commandCollected = true;
break;