aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c18
1 files changed, 15 insertions, 3 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 32245363561..92c755f346e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
bool *reserve_wal,
- CRSSnapshotAction *snapshot_action)
+ CRSSnapshotAction *snapshot_action,
+ bool *two_phase)
{
ListCell *lc;
bool snapshot_action_given = false;
bool reserve_wal_given = false;
+ bool two_phase_given = false;
/* Parse options */
foreach(lc, cmd->options)
@@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
reserve_wal_given = true;
*reserve_wal = true;
}
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_given = true;
+ *two_phase = true;
+ }
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
@@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char xloc[MAXFNAMELEN];
char *slot_name;
bool reserve_wal = false;
+ bool two_phase = false;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
DestReceiver *dest;
TupOutputState *tstate;
@@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert(!MyReplicationSlot);
- parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
+ parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
/* setup state for WalSndSegmentOpen */
sendTimeLineIsHistoric = false;
@@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
- false);
+ two_phase);
}
if (cmd->kind == REPLICATION_KIND_LOGICAL)