aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c236
1 files changed, 211 insertions, 25 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index ccbdbcf08f9..19cc8046786 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -31,8 +31,11 @@
* table state to INIT.
* - Tablesync worker starts; changes table state from INIT to DATASYNC while
* copying.
- * - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
- * waits for state change.
+ * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
+ * worker specific) state to indicate when the copy phase has completed, so
+ * if the worker crashes with this (non-memory) state then the copy will not
+ * be re-attempted.
+ * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
* - Apply worker periodically checks for tables in SYNCWAIT state. When
* any appear, it sets the table state to CATCHUP and starts loop-waiting
* until either the table state is set to SYNCDONE or the sync worker
@@ -48,8 +51,8 @@
* point it sets state to READY and stops tracking. Again, there might
* be zero changes in between.
*
- * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
- * CATCHUP -> SYNCDONE -> READY.
+ * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
+ * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
*
* The catalog pg_subscription_rel is used to keep information about
* subscribed tables and their state. The catalog holds all states
@@ -58,6 +61,7 @@
* Example flows look like this:
* - Apply is in front:
* sync:8
+ * -> set in catalog FINISHEDCOPY
* -> set in memory SYNCWAIT
* apply:10
* -> set in memory CATCHUP
@@ -73,6 +77,7 @@
*
* - Sync is in front:
* sync:10
+ * -> set in catalog FINISHEDCOPY
* -> set in memory SYNCWAIT
* apply:8
* -> set in memory CATCHUP
@@ -101,7 +106,10 @@
#include "replication/logicalrelation.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
#include "storage/ipc.h"
+#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -269,26 +277,52 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
static void
process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{
- Assert(IsTransactionState());
-
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
current_lsn >= MyLogicalRepWorker->relstate_lsn)
{
TimeLineID tli;
+ char syncslotname[NAMEDATALEN] = {0};
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn;
SpinLockRelease(&MyLogicalRepWorker->relmutex);
+ /*
+ * UpdateSubscriptionRelState must be called within a transaction.
+ * That transaction will be ended within the finish_sync_worker().
+ */
+ if (!IsTransactionState())
+ StartTransactionCommand();
+
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
+ /* End wal streaming so wrconn can be re-used to drop the slot. */
walrcv_endstreaming(wrconn, &tli);
+
+ /*
+ * Cleanup the tablesync slot.
+ *
+ * This has to be done after updating the state because otherwise if
+ * there is an error while doing the database operations we won't be
+ * able to rollback dropped slot.
+ */
+ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ syncslotname);
+
+ /*
+ * It is important to give an error if we are unable to drop the slot,
+ * otherwise, it won't be dropped till the corresponding subscription
+ * is dropped. So passing missing_ok = false.
+ */
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+
finish_sync_worker();
}
else
@@ -403,6 +437,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
if (current_lsn >= rstate->lsn)
{
+ char originname[NAMEDATALEN];
+
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
if (!started_tx)
@@ -411,6 +447,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
started_tx = true;
}
+ /*
+ * Remove the tablesync origin tracking if exists.
+ *
+ * The normal case origin drop is done here instead of in the
+ * process_syncing_tables_for_sync function because we don't
+ * allow to drop the origin till the process owning the origin
+ * is alive.
+ *
+ * There is a chance that the user is concurrently performing
+ * refresh for the subscription where we remove the table
+ * state and its origin and by this time the origin might be
+ * already removed. So passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+ rstate->relid,
+ originname);
+ replorigin_drop_by_name(originname, true, false);
+
+ /*
+ * Update the state to READY only after the origin cleanup.
+ */
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
rstate->lsn);
@@ -806,6 +863,50 @@ copy_table(Relation rel)
}
/*
+ * Determine the tablesync slot name.
+ *
+ * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
+ * on slot name length. We append system_identifier to avoid slot_name
+ * collision with subscriptions in other clusters. With the current scheme
+ * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
+ * length of slot_name will be 50.
+ *
+ * The returned slot name is either:
+ * - stored in the supplied buffer (syncslotname), or
+ * - palloc'ed in current memory context (if syncslotname = NULL).
+ *
+ * Note: We don't use the subscription slot name as part of tablesync slot name
+ * because we are responsible for cleaning up these slots and it could become
+ * impossible to recalculate what name to cleanup if the subscription slot name
+ * had changed.
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
+ char syncslotname[NAMEDATALEN])
+{
+ if (syncslotname)
+ sprintf(syncslotname, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
+ GetSystemIdentifier());
+ else
+ syncslotname = psprintf("pg_%u_sync_%u_" UINT64_FORMAT, suboid, relid,
+ GetSystemIdentifier());
+
+ return syncslotname;
+}
+
+/*
+ * Form the origin name for tablesync.
+ *
+ * Return the name in the supplied buffer.
+ */
+void
+ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
+ char originname[NAMEDATALEN])
+{
+ snprintf(originname, NAMEDATALEN, "pg_%u_%u", suboid, relid);
+}
+
+/*
* Start syncing the table in the sync worker.
*
* If nothing needs to be done to sync the table, we exit the worker without
@@ -822,6 +923,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
XLogRecPtr relstate_lsn;
Relation rel;
WalRcvExecResult *res;
+ char originname[NAMEDATALEN];
+ RepOriginId originid;
/* Check the state of the table synchronization. */
StartTransactionCommand();
@@ -847,19 +950,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
finish_sync_worker(); /* doesn't return */
}
- /*
- * To build a slot name for the sync work, we are limited to NAMEDATALEN -
- * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
- * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
- * NAMEDATALEN on the remote that matters, but this scheme will also work
- * reasonably if that is different.)
- */
- StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
- slotname = psprintf("%.*s_%u_sync_%u",
- NAMEDATALEN - 28,
- MySubscription->slotname,
- MySubscription->oid,
- MyLogicalRepWorker->relid);
+ /* Calculate the name of the tablesync slot. */
+ slotname = ReplicationSlotNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ NULL /* use palloc */ );
/*
* Here we use the slot name instead of the subscription name as the
@@ -872,7 +966,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
(errmsg("could not connect to the publisher: %s", err)));
Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
- MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+ MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+ MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+
+ /* Assign the origin tracking record name. */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname);
+
+ if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
+ {
+ /*
+ * We have previously errored out before finishing the copy so the
+ * replication slot might exist. We want to remove the slot if it
+ * already exists and proceed.
+ *
+ * XXX We could also instead try to drop the slot, last time we failed
+ * but for that, we might need to clean up the copy state as it might
+ * be in the middle of fetching the rows. Also, if there is a network
+ * breakdown then it wouldn't have succeeded so trying it next time
+ * seems like a better bet.
+ */
+ ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+ }
+ else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
+ {
+ /*
+ * The COPY phase was previously done, but tablesync then crashed
+ * before it was able to finish normally.
+ */
+ StartTransactionCommand();
+
+ /*
+ * The origin tracking name must already exist. It was created first
+ * time this tablesync was launched.
+ */
+ originid = replorigin_by_name(originname, false);
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
+ *origin_startpos = replorigin_session_get_progress(false);
+
+ CommitTransactionCommand();
+
+ goto copy_table_done;
+ }
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -888,9 +1025,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommitTransactionCommand();
pgstat_report_stat(false);
- /*
- * We want to do the table data sync in a single transaction.
- */
StartTransactionCommand();
/*
@@ -916,13 +1050,46 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
walrcv_clear_result(res);
/*
- * Create a new temporary logical decoding slot. This slot will be used
+ * Create a new permanent logical decoding slot. This slot will be used
* for the catchup phase after COPY is done, so tell it to use the
* snapshot to make the final data consistent.
*/
- walrcv_create_slot(wrconn, slotname, true,
+ walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
CRS_USE_SNAPSHOT, origin_startpos);
+ /*
+ * Setup replication origin tracking. The purpose of doing this before the
+ * copy is to avoid doing the copy again due to any error in setting up
+ * origin tracking.
+ */
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ {
+ /*
+ * Origin tracking does not exist, so create it now.
+ *
+ * Then advance to the LSN got from walrcv_create_slot. This is WAL
+ * logged for the purpose of recovery. Locks are to prevent the
+ * replication origin from vanishing while advancing.
+ */
+ originid = replorigin_create(originname);
+
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+ true /* go backward */ , true /* WAL log */ );
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("replication origin \"%s\" already exists",
+ originname)));
+ }
+
/* Now do the initial data copy */
PushActiveSnapshot(GetTransactionSnapshot());
copy_table(rel);
@@ -941,6 +1108,25 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommandCounterIncrement();
/*
+ * Update the persisted state to indicate the COPY phase is done; make it
+ * visible to others.
+ */
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_FINISHEDCOPY,
+ MyLogicalRepWorker->relstate_lsn);
+
+ CommitTransactionCommand();
+
+copy_table_done:
+
+ elog(DEBUG1,
+ "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+ originname,
+ (uint32) (*origin_startpos >> 32),
+ (uint32) *origin_startpos);
+
+ /*
* We are done with the initial data synchronization, update the state.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);