diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 145 |
1 files changed, 73 insertions, 72 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 7e51076b376..1e3753b8fe2 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -113,7 +113,8 @@ StringInfo copybuf = NULL; /* * Exit routine for synchronization worker. */ -static void pg_attribute_noreturn() +static void +pg_attribute_noreturn() finish_sync_worker(void) { /* @@ -148,12 +149,12 @@ finish_sync_worker(void) static bool wait_for_sync_status_change(Oid relid, char origstate) { - int rc; - char state = origstate; + int rc; + char state = origstate; while (!got_SIGTERM) { - LogicalRepWorker *worker; + LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, @@ -269,7 +270,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) struct tablesync_start_time_mapping { Oid relid; - TimestampTz last_start_time; + TimestampTz last_start_time; }; static List *table_states = NIL; static HTAB *last_start_times = NULL; @@ -281,9 +282,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* We need up to date sync state info for subscription tables here. */ if (!table_states_valid) { - MemoryContext oldctx; - List *rstates; - ListCell *lc; + MemoryContext oldctx; + List *rstates; + ListCell *lc; SubscriptionRelState *rstate; /* Clean the old list. */ @@ -294,7 +295,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); /* Allocate the tracking info in a permanent memory context. */ oldctx = MemoryContextSwitchTo(CacheMemoryContext); @@ -324,6 +325,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = hash_create("Logical replication table sync worker start times", 256, &ctl, HASH_ELEM | HASH_BLOBS); } + /* * Clean up the hash table when we're done with all tables (just to * release the bit of memory). @@ -337,14 +339,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* Process all tables that are being synchronized. */ foreach(lc, table_states) { - SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc); + SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); if (rstate->state == SUBREL_STATE_SYNCDONE) { /* - * Apply has caught up to the position where the table sync - * has finished. Time to mark the table as ready so that - * apply will just continue to replicate it normally. + * Apply has caught up to the position where the table sync has + * finished. Time to mark the table as ready so that apply will + * just continue to replicate it normally. */ if (current_lsn >= rstate->lsn) { @@ -362,8 +364,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } else { - LogicalRepWorker *syncworker; - int nsyncworkers = 0; + LogicalRepWorker *syncworker; + int nsyncworkers = 0; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, @@ -376,6 +378,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) SpinLockRelease(&syncworker->relmutex); } else + /* * If no sync worker for this table yet, count running sync * workers for this subscription, while we have the lock, for @@ -394,16 +397,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * There are three possible synchronization situations here. * * a) Apply is in front of the table sync: We tell the table - * sync to CATCHUP. + * sync to CATCHUP. * * b) Apply is behind the table sync: We tell the table sync - * to mark the table as SYNCDONE and finish. - + * to mark the table as SYNCDONE and finish. + * * c) Apply and table sync are at the same position: We tell - * table sync to mark the table as READY and finish. + * table sync to mark the table as READY and finish. * - * In any case we'll need to wait for table sync to change - * the state in catalog and only then continue ourselves. + * In any case we'll need to wait for table sync to change the + * state in catalog and only then continue ourselves. */ if (current_lsn > rstate->lsn) { @@ -427,20 +430,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) logicalrep_worker_wakeup_ptr(syncworker); /* - * Enter busy loop and wait for synchronization status - * change. + * Enter busy loop and wait for synchronization status change. */ wait_for_sync_status_change(rstate->relid, rstate->state); } /* - * If there is no sync worker registered for the table and - * there is some free sync worker slot, start new sync worker - * for the table. + * If there is no sync worker registered for the table and there + * is some free sync worker slot, start new sync worker for the + * table. */ else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) { - TimestampTz now = GetCurrentTimestamp(); + TimestampTz now = GetCurrentTimestamp(); struct tablesync_start_time_mapping *hentry; bool found; @@ -492,7 +494,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) for (i = 0; i < desc->natts; i++) { - int remoteattnum = rel->attrmap[i]; + int remoteattnum = rel->attrmap[i]; /* Skip dropped attributes. */ if (desc->attrs[i]->attisdropped) @@ -503,7 +505,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) continue; attnamelist = lappend(attnamelist, - makeString(rel->remoterel.attnames[remoteattnum])); + makeString(rel->remoterel.attnames[remoteattnum])); } return attnamelist; @@ -516,8 +518,8 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel) static int copy_read_data(void *outbuf, int minread, int maxread) { - int bytesread = 0; - int avail; + int bytesread = 0; + int avail; /* If there are some leftover data from previous read, use them. */ avail = copybuf->len - copybuf->cursor; @@ -601,13 +603,13 @@ static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel) { - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[2] = {OIDOID, CHAROID}; - Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; - bool isnull; - int natt; + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {OIDOID, CHAROID}; + Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; + bool isnull; + int natt; lrel->nspname = nspname; lrel->relname = relname; @@ -615,14 +617,14 @@ fetch_remote_table_info(char *nspname, char *relname, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" - " FROM pg_catalog.pg_class c" - " INNER JOIN pg_catalog.pg_namespace n" - " ON (c.relnamespace = n.oid)" - " WHERE n.nspname = %s" - " AND c.relname = %s" - " AND c.relkind = 'r'", - quote_literal_cstr(nspname), - quote_literal_cstr(relname)); + " FROM pg_catalog.pg_class c" + " INNER JOIN pg_catalog.pg_namespace n" + " ON (c.relnamespace = n.oid)" + " WHERE n.nspname = %s" + " AND c.relname = %s" + " AND c.relkind = 'r'", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); res = walrcv_exec(wrconn, cmd.data, 2, tableRow); if (res->status != WALRCV_OK_TUPLES) @@ -653,7 +655,7 @@ fetch_remote_table_info(char *nspname, char *relname, " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" - " ON (i.indexrelid = pg_get_replica_identity_index(%u))" + " ON (i.indexrelid = pg_get_replica_identity_index(%u))" " WHERE a.attnum > 0::pg_catalog.int2" " AND NOT a.attisdropped" " AND a.attrelid = %u" @@ -686,7 +688,7 @@ fetch_remote_table_info(char *nspname, char *relname, /* Should never happen. */ if (++natt >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", - nspname, relname); + nspname, relname); ExecClearTuple(slot); } @@ -707,9 +709,9 @@ static void copy_table(Relation rel) { LogicalRepRelMapEntry *relmapentry; - LogicalRepRelation lrel; - WalRcvExecResult *res; - StringInfoData cmd; + LogicalRepRelation lrel; + WalRcvExecResult *res; + StringInfoData cmd; CopyState cstate; List *attnamelist; ParseState *pstate; @@ -759,8 +761,8 @@ copy_table(Relation rel) char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { - char *slotname; - char *err; + char *slotname; + char *err; char relstate; XLogRecPtr relstate_lsn; @@ -783,7 +785,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * NAMEDATALEN on the remote that matters, but this scheme will also work * reasonably if that is different.) */ - StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ + StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ slotname = psprintf("%.*s_%u_sync_%u", NAMEDATALEN - 28, MySubscription->slotname, @@ -801,7 +803,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) case SUBREL_STATE_DATASYNC: { Relation rel; - WalRcvExecResult *res; + WalRcvExecResult *res; SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; @@ -818,24 +820,23 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) pgstat_report_stat(false); /* - * We want to do the table data sync in single - * transaction. + * We want to do the table data sync in single transaction. */ StartTransactionCommand(); /* * Use standard write lock here. It might be better to - * disallow access to table while it's being synchronized. - * But we don't want to block the main apply process from - * working and it has to open relation in RowExclusiveLock - * when remapping remote relation id to local one. + * disallow access to table while it's being synchronized. But + * we don't want to block the main apply process from working + * and it has to open relation in RowExclusiveLock when + * remapping remote relation id to local one. */ rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock); /* - * Create temporary slot for the sync process. - * We do this inside transaction so that we can use the - * snapshot made by the slot to get existing data. + * Create temporary slot for the sync process. We do this + * inside transaction so that we can use the snapshot made by + * the slot to get existing data. */ res = walrcv_exec(wrconn, "BEGIN READ ONLY ISOLATION LEVEL " @@ -849,10 +850,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* * Create new temporary logical decoding slot. * - * We'll use slot for data copy so make sure the snapshot - * is used for the transaction, that way the COPY will get - * data that is consistent with the lsn used by the slot - * to start decoding. + * We'll use slot for data copy so make sure the snapshot is + * used for the transaction, that way the COPY will get data + * that is consistent with the lsn used by the slot to start + * decoding. */ walrcv_create_slot(wrconn, slotname, true, CRS_USE_SNAPSHOT, origin_startpos); @@ -872,8 +873,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommandCounterIncrement(); /* - * We are done with the initial data synchronization, - * update the state. + * We are done with the initial data synchronization, update + * the state. */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; @@ -881,8 +882,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) SpinLockRelease(&MyLogicalRepWorker->relmutex); /* - * Wait for main apply worker to either tell us to - * catchup or that we are done. + * Wait for main apply worker to either tell us to catchup or + * that we are done. */ wait_for_sync_status_change(MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate); |