aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c145
1 files changed, 73 insertions, 72 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 7e51076b376..1e3753b8fe2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -113,7 +113,8 @@ StringInfo copybuf = NULL;
/*
* Exit routine for synchronization worker.
*/
-static void pg_attribute_noreturn()
+static void
+pg_attribute_noreturn()
finish_sync_worker(void)
{
/*
@@ -148,12 +149,12 @@ finish_sync_worker(void)
static bool
wait_for_sync_status_change(Oid relid, char origstate)
{
- int rc;
- char state = origstate;
+ int rc;
+ char state = origstate;
while (!got_SIGTERM)
{
- LogicalRepWorker *worker;
+ LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
@@ -269,7 +270,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
struct tablesync_start_time_mapping
{
Oid relid;
- TimestampTz last_start_time;
+ TimestampTz last_start_time;
};
static List *table_states = NIL;
static HTAB *last_start_times = NULL;
@@ -281,9 +282,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* We need up to date sync state info for subscription tables here. */
if (!table_states_valid)
{
- MemoryContext oldctx;
- List *rstates;
- ListCell *lc;
+ MemoryContext oldctx;
+ List *rstates;
+ ListCell *lc;
SubscriptionRelState *rstate;
/* Clean the old list. */
@@ -294,7 +295,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
started_tx = true;
/* Fetch all non-ready tables. */
- rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
+ rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
@@ -324,6 +325,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
last_start_times = hash_create("Logical replication table sync worker start times",
256, &ctl, HASH_ELEM | HASH_BLOBS);
}
+
/*
* Clean up the hash table when we're done with all tables (just to
* release the bit of memory).
@@ -337,14 +339,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
/* Process all tables that are being synchronized. */
foreach(lc, table_states)
{
- SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
+ SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
if (rstate->state == SUBREL_STATE_SYNCDONE)
{
/*
- * Apply has caught up to the position where the table sync
- * has finished. Time to mark the table as ready so that
- * apply will just continue to replicate it normally.
+ * Apply has caught up to the position where the table sync has
+ * finished. Time to mark the table as ready so that apply will
+ * just continue to replicate it normally.
*/
if (current_lsn >= rstate->lsn)
{
@@ -362,8 +364,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
else
{
- LogicalRepWorker *syncworker;
- int nsyncworkers = 0;
+ LogicalRepWorker *syncworker;
+ int nsyncworkers = 0;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
@@ -376,6 +378,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
SpinLockRelease(&syncworker->relmutex);
}
else
+
/*
* If no sync worker for this table yet, count running sync
* workers for this subscription, while we have the lock, for
@@ -394,16 +397,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* There are three possible synchronization situations here.
*
* a) Apply is in front of the table sync: We tell the table
- * sync to CATCHUP.
+ * sync to CATCHUP.
*
* b) Apply is behind the table sync: We tell the table sync
- * to mark the table as SYNCDONE and finish.
-
+ * to mark the table as SYNCDONE and finish.
+ *
* c) Apply and table sync are at the same position: We tell
- * table sync to mark the table as READY and finish.
+ * table sync to mark the table as READY and finish.
*
- * In any case we'll need to wait for table sync to change
- * the state in catalog and only then continue ourselves.
+ * In any case we'll need to wait for table sync to change the
+ * state in catalog and only then continue ourselves.
*/
if (current_lsn > rstate->lsn)
{
@@ -427,20 +430,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
logicalrep_worker_wakeup_ptr(syncworker);
/*
- * Enter busy loop and wait for synchronization status
- * change.
+ * Enter busy loop and wait for synchronization status change.
*/
wait_for_sync_status_change(rstate->relid, rstate->state);
}
/*
- * If there is no sync worker registered for the table and
- * there is some free sync worker slot, start new sync worker
- * for the table.
+ * If there is no sync worker registered for the table and there
+ * is some free sync worker slot, start new sync worker for the
+ * table.
*/
else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
{
- TimestampTz now = GetCurrentTimestamp();
+ TimestampTz now = GetCurrentTimestamp();
struct tablesync_start_time_mapping *hentry;
bool found;
@@ -492,7 +494,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
for (i = 0; i < desc->natts; i++)
{
- int remoteattnum = rel->attrmap[i];
+ int remoteattnum = rel->attrmap[i];
/* Skip dropped attributes. */
if (desc->attrs[i]->attisdropped)
@@ -503,7 +505,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
continue;
attnamelist = lappend(attnamelist,
- makeString(rel->remoterel.attnames[remoteattnum]));
+ makeString(rel->remoterel.attnames[remoteattnum]));
}
return attnamelist;
@@ -516,8 +518,8 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
static int
copy_read_data(void *outbuf, int minread, int maxread)
{
- int bytesread = 0;
- int avail;
+ int bytesread = 0;
+ int avail;
/* If there are some leftover data from previous read, use them. */
avail = copybuf->len - copybuf->cursor;
@@ -601,13 +603,13 @@ static void
fetch_remote_table_info(char *nspname, char *relname,
LogicalRepRelation *lrel)
{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[2] = {OIDOID, CHAROID};
- Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
- bool isnull;
- int natt;
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {OIDOID, CHAROID};
+ Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
+ bool isnull;
+ int natt;
lrel->nspname = nspname;
lrel->relname = relname;
@@ -615,14 +617,14 @@ fetch_remote_table_info(char *nspname, char *relname,
/* First fetch Oid and replica identity. */
initStringInfo(&cmd);
appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
- " FROM pg_catalog.pg_class c"
- " INNER JOIN pg_catalog.pg_namespace n"
- " ON (c.relnamespace = n.oid)"
- " WHERE n.nspname = %s"
- " AND c.relname = %s"
- " AND c.relkind = 'r'",
- quote_literal_cstr(nspname),
- quote_literal_cstr(relname));
+ " FROM pg_catalog.pg_class c"
+ " INNER JOIN pg_catalog.pg_namespace n"
+ " ON (c.relnamespace = n.oid)"
+ " WHERE n.nspname = %s"
+ " AND c.relname = %s"
+ " AND c.relkind = 'r'",
+ quote_literal_cstr(nspname),
+ quote_literal_cstr(relname));
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
if (res->status != WALRCV_OK_TUPLES)
@@ -653,7 +655,7 @@ fetch_remote_table_info(char *nspname, char *relname,
" a.attnum = ANY(i.indkey)"
" FROM pg_catalog.pg_attribute a"
" LEFT JOIN pg_catalog.pg_index i"
- " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
+ " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
" WHERE a.attnum > 0::pg_catalog.int2"
" AND NOT a.attisdropped"
" AND a.attrelid = %u"
@@ -686,7 +688,7 @@ fetch_remote_table_info(char *nspname, char *relname,
/* Should never happen. */
if (++natt >= MaxTupleAttributeNumber)
elog(ERROR, "too many columns in remote table \"%s.%s\"",
- nspname, relname);
+ nspname, relname);
ExecClearTuple(slot);
}
@@ -707,9 +709,9 @@ static void
copy_table(Relation rel)
{
LogicalRepRelMapEntry *relmapentry;
- LogicalRepRelation lrel;
- WalRcvExecResult *res;
- StringInfoData cmd;
+ LogicalRepRelation lrel;
+ WalRcvExecResult *res;
+ StringInfoData cmd;
CopyState cstate;
List *attnamelist;
ParseState *pstate;
@@ -759,8 +761,8 @@ copy_table(Relation rel)
char *
LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
{
- char *slotname;
- char *err;
+ char *slotname;
+ char *err;
char relstate;
XLogRecPtr relstate_lsn;
@@ -783,7 +785,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* NAMEDATALEN on the remote that matters, but this scheme will also work
* reasonably if that is different.)
*/
- StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
+ StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
slotname = psprintf("%.*s_%u_sync_%u",
NAMEDATALEN - 28,
MySubscription->slotname,
@@ -801,7 +803,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
case SUBREL_STATE_DATASYNC:
{
Relation rel;
- WalRcvExecResult *res;
+ WalRcvExecResult *res;
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -818,24 +820,23 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
pgstat_report_stat(false);
/*
- * We want to do the table data sync in single
- * transaction.
+ * We want to do the table data sync in single transaction.
*/
StartTransactionCommand();
/*
* Use standard write lock here. It might be better to
- * disallow access to table while it's being synchronized.
- * But we don't want to block the main apply process from
- * working and it has to open relation in RowExclusiveLock
- * when remapping remote relation id to local one.
+ * disallow access to table while it's being synchronized. But
+ * we don't want to block the main apply process from working
+ * and it has to open relation in RowExclusiveLock when
+ * remapping remote relation id to local one.
*/
rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock);
/*
- * Create temporary slot for the sync process.
- * We do this inside transaction so that we can use the
- * snapshot made by the slot to get existing data.
+ * Create temporary slot for the sync process. We do this
+ * inside transaction so that we can use the snapshot made by
+ * the slot to get existing data.
*/
res = walrcv_exec(wrconn,
"BEGIN READ ONLY ISOLATION LEVEL "
@@ -849,10 +850,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/*
* Create new temporary logical decoding slot.
*
- * We'll use slot for data copy so make sure the snapshot
- * is used for the transaction, that way the COPY will get
- * data that is consistent with the lsn used by the slot
- * to start decoding.
+ * We'll use slot for data copy so make sure the snapshot is
+ * used for the transaction, that way the COPY will get data
+ * that is consistent with the lsn used by the slot to start
+ * decoding.
*/
walrcv_create_slot(wrconn, slotname, true,
CRS_USE_SNAPSHOT, origin_startpos);
@@ -872,8 +873,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
CommandCounterIncrement();
/*
- * We are done with the initial data synchronization,
- * update the state.
+ * We are done with the initial data synchronization, update
+ * the state.
*/
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
@@ -881,8 +882,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
- * Wait for main apply worker to either tell us to
- * catchup or that we are done.
+ * Wait for main apply worker to either tell us to catchup or
+ * that we are done.
*/
wait_for_sync_status_change(MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate);