diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 82 |
1 files changed, 43 insertions, 39 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a2b2adb1732..7f2bbca302e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -60,7 +60,7 @@ #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" -#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ +#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/sinval.h" @@ -582,7 +582,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, */ void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, - ReorderBufferChange *change) + ReorderBufferChange *change) { ReorderBufferTXN *txn; @@ -1047,8 +1047,8 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Cleanup the tuplecids we stored for decoding catalog snapshot - * access. They are always stored in the toplevel transaction. + * Cleanup the tuplecids we stored for decoding catalog snapshot access. + * They are always stored in the toplevel transaction. */ dlist_foreach_modify(iter, &txn->tuplecids) { @@ -1204,9 +1204,9 @@ ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, snap->subxip[i++] = txn->xid; /* - * nsubxcnt isn't decreased when subtransactions abort, so count - * manually. Since it's an upper boundary it is safe to use it for the - * allocation above. + * nsubxcnt isn't decreased when subtransactions abort, so count manually. + * Since it's an upper boundary it is safe to use it for the allocation + * above. */ snap->subxcnt = 1; @@ -1262,10 +1262,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNState *iterstate = NULL; ReorderBufferChange *change; - volatile CommandId command_id = FirstCommandId; - volatile Snapshot snapshot_now = NULL; - volatile bool txn_started = false; - volatile bool subtxn_started = false; + volatile CommandId command_id = FirstCommandId; + volatile Snapshot snapshot_now = NULL; + volatile bool txn_started = false; + volatile bool subtxn_started = false; txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -1309,8 +1309,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* * Decoding needs access to syscaches et al., which in turn use - * heavyweight locks and such. Thus we need to have enough state around - * to keep track of those. The easiest way is to simply use a + * heavyweight locks and such. Thus we need to have enough state + * around to keep track of those. The easiest way is to simply use a * transaction internally. That also allows us to easily enforce that * nothing writes to the database by checking for xid assignments. * @@ -1344,7 +1344,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Assert(snapshot_now); reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, - change->data.tp.relnode.relNode); + change->data.tp.relnode.relNode); /* * Catalog tuple without data, emitted while catalog was @@ -1415,6 +1415,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferCopySnap(rb, change->data.snapshot, txn, command_id); } + /* * Restored from disk, need to be careful not to double * free. We could introduce refcounting for that, but for @@ -1447,7 +1448,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, { /* we don't use the global one anymore */ snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, - txn, command_id); + txn, command_id); } snapshot_now->curcid = command_id; @@ -1586,7 +1587,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) */ dlist_foreach_modify(it, &rb->toplevel_by_lsn) { - ReorderBufferTXN * txn; + ReorderBufferTXN *txn; txn = dlist_container(ReorderBufferTXN, node, it.cur); @@ -1998,7 +1999,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: { char *data; - ReorderBufferTupleBuf *oldtup, *newtup; + ReorderBufferTupleBuf *oldtup, + *newtup; Size oldlen = 0; Size newlen = 0; @@ -2007,12 +2009,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldtup) oldlen = offsetof(ReorderBufferTupleBuf, data) - + oldtup->tuple.t_len + +oldtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); if (newtup) newlen = offsetof(ReorderBufferTupleBuf, data) - + newtup->tuple.t_len + +newtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); sz += oldlen; @@ -2188,7 +2190,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, else if (readBytes < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); + errmsg("could not read from reorderbuffer spill file: %m"))); else if (readBytes != sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), @@ -2199,7 +2201,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ondisk = (ReorderBufferDiskChange *) rb->outbuf; ReorderBufferSerializeReserve(rb, - sizeof(ReorderBufferDiskChange) + ondisk->size); + sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), @@ -2208,13 +2210,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, if (readBytes < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); + errmsg("could not read from reorderbuffer spill file: %m"))); else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", readBytes, - (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); + (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); /* * ok, read a full change from disk, now restore it into proper @@ -2364,7 +2366,7 @@ StartupReorderBuffer(void) logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { - struct stat statbuf; + struct stat statbuf; char path[MAXPGPATH]; if (strcmp(logical_de->d_name, ".") == 0 || @@ -2620,7 +2622,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, cchange = dlist_container(ReorderBufferChange, node, it.cur); ctup = cchange->data.tp.newtuple; chunk = DatumGetPointer( - fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); + fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); Assert(!isnull); Assert(!VARATT_IS_EXTERNAL(chunk)); @@ -2800,7 +2802,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) ReorderBufferTupleCidKey key; ReorderBufferTupleCidEnt *ent; ReorderBufferTupleCidEnt *new_ent; - bool found; + bool found; /* be careful about padding */ memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); @@ -2813,7 +2815,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", path))); - else if (readBytes == 0) /* EOF */ + else if (readBytes == 0) /* EOF */ break; else if (readBytes != sizeof(LogicalRewriteMappingData)) ereport(ERROR, @@ -2884,8 +2886,8 @@ TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num) static int file_sort_by_lsn(const void *a_p, const void *b_p) { - RewriteMappingFile *a = *(RewriteMappingFile **)a_p; - RewriteMappingFile *b = *(RewriteMappingFile **)b_p; + RewriteMappingFile *a = *(RewriteMappingFile **) a_p; + RewriteMappingFile *b = *(RewriteMappingFile **) b_p; if (a->lsn < b->lsn) return -1; @@ -2912,19 +2914,20 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) mapping_dir = AllocateDir("pg_llog/mappings"); while ((mapping_de = ReadDir(mapping_dir, "pg_llog/mappings")) != NULL) { - Oid f_dboid; - Oid f_relid; - TransactionId f_mapped_xid; - TransactionId f_create_xid; - XLogRecPtr f_lsn; - uint32 f_hi, f_lo; + Oid f_dboid; + Oid f_relid; + TransactionId f_mapped_xid; + TransactionId f_create_xid; + XLogRecPtr f_lsn; + uint32 f_hi, + f_lo; RewriteMappingFile *f; if (strcmp(mapping_de->d_name, ".") == 0 || strcmp(mapping_de->d_name, "..") == 0) continue; - /* Ignore files that aren't ours*/ + /* Ignore files that aren't ours */ if (strncmp(mapping_de->d_name, "map-", 4) != 0) continue; @@ -2971,11 +2974,12 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) qsort(files_a, list_length(files), sizeof(RewriteMappingFile *), file_sort_by_lsn); - for(off = 0; off < list_length(files); off++) + for (off = 0; off < list_length(files); off++) { RewriteMappingFile *f = files_a[off]; + elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname, - snapshot->subxip[0]); + snapshot->subxip[0]); ApplyLogicalMappingFile(tuplecid_data, relid, f->fname); pfree(f); } @@ -2995,7 +2999,7 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, ReorderBufferTupleCidEnt *ent; ForkNumber forkno; BlockNumber blockno; - bool updated_mapping = false; + bool updated_mapping = false; /* be careful about padding */ memset(&key, 0, sizeof(key)); |