aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c82
1 files changed, 43 insertions, 39 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index a2b2adb1732..7f2bbca302e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -60,7 +60,7 @@
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
-#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
+#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
@@ -582,7 +582,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
*/
void
ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
- ReorderBufferChange *change)
+ ReorderBufferChange *change)
{
ReorderBufferTXN *txn;
@@ -1047,8 +1047,8 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Cleanup the tuplecids we stored for decoding catalog snapshot
- * access. They are always stored in the toplevel transaction.
+ * Cleanup the tuplecids we stored for decoding catalog snapshot access.
+ * They are always stored in the toplevel transaction.
*/
dlist_foreach_modify(iter, &txn->tuplecids)
{
@@ -1204,9 +1204,9 @@ ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
snap->subxip[i++] = txn->xid;
/*
- * nsubxcnt isn't decreased when subtransactions abort, so count
- * manually. Since it's an upper boundary it is safe to use it for the
- * allocation above.
+ * nsubxcnt isn't decreased when subtransactions abort, so count manually.
+ * Since it's an upper boundary it is safe to use it for the allocation
+ * above.
*/
snap->subxcnt = 1;
@@ -1262,10 +1262,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferIterTXNState *iterstate = NULL;
ReorderBufferChange *change;
- volatile CommandId command_id = FirstCommandId;
- volatile Snapshot snapshot_now = NULL;
- volatile bool txn_started = false;
- volatile bool subtxn_started = false;
+ volatile CommandId command_id = FirstCommandId;
+ volatile Snapshot snapshot_now = NULL;
+ volatile bool txn_started = false;
+ volatile bool subtxn_started = false;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
@@ -1309,8 +1309,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
/*
* Decoding needs access to syscaches et al., which in turn use
- * heavyweight locks and such. Thus we need to have enough state around
- * to keep track of those. The easiest way is to simply use a
+ * heavyweight locks and such. Thus we need to have enough state
+ * around to keep track of those. The easiest way is to simply use a
* transaction internally. That also allows us to easily enforce that
* nothing writes to the database by checking for xid assignments.
*
@@ -1344,7 +1344,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
Assert(snapshot_now);
reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
- change->data.tp.relnode.relNode);
+ change->data.tp.relnode.relNode);
/*
* Catalog tuple without data, emitted while catalog was
@@ -1415,6 +1415,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferCopySnap(rb, change->data.snapshot,
txn, command_id);
}
+
/*
* Restored from disk, need to be careful not to double
* free. We could introduce refcounting for that, but for
@@ -1447,7 +1448,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
{
/* we don't use the global one anymore */
snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
- txn, command_id);
+ txn, command_id);
}
snapshot_now->curcid = command_id;
@@ -1586,7 +1587,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
*/
dlist_foreach_modify(it, &rb->toplevel_by_lsn)
{
- ReorderBufferTXN * txn;
+ ReorderBufferTXN *txn;
txn = dlist_container(ReorderBufferTXN, node, it.cur);
@@ -1998,7 +1999,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_DELETE:
{
char *data;
- ReorderBufferTupleBuf *oldtup, *newtup;
+ ReorderBufferTupleBuf *oldtup,
+ *newtup;
Size oldlen = 0;
Size newlen = 0;
@@ -2007,12 +2009,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldtup)
oldlen = offsetof(ReorderBufferTupleBuf, data)
- + oldtup->tuple.t_len
+ +oldtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
if (newtup)
newlen = offsetof(ReorderBufferTupleBuf, data)
- + newtup->tuple.t_len
+ +newtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
sz += oldlen;
@@ -2188,7 +2190,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
else if (readBytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from reorderbuffer spill file: %m")));
+ errmsg("could not read from reorderbuffer spill file: %m")));
else if (readBytes != sizeof(ReorderBufferDiskChange))
ereport(ERROR,
(errcode_for_file_access(),
@@ -2199,7 +2201,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
ReorderBufferSerializeReserve(rb,
- sizeof(ReorderBufferDiskChange) + ondisk->size);
+ sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
@@ -2208,13 +2210,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (readBytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from reorderbuffer spill file: %m")));
+ errmsg("could not read from reorderbuffer spill file: %m")));
else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
readBytes,
- (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
+ (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
/*
* ok, read a full change from disk, now restore it into proper
@@ -2364,7 +2366,7 @@ StartupReorderBuffer(void)
logical_dir = AllocateDir("pg_replslot");
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
{
- struct stat statbuf;
+ struct stat statbuf;
char path[MAXPGPATH];
if (strcmp(logical_de->d_name, ".") == 0 ||
@@ -2620,7 +2622,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
cchange = dlist_container(ReorderBufferChange, node, it.cur);
ctup = cchange->data.tp.newtuple;
chunk = DatumGetPointer(
- fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
+ fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
Assert(!isnull);
Assert(!VARATT_IS_EXTERNAL(chunk));
@@ -2800,7 +2802,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
ReorderBufferTupleCidKey key;
ReorderBufferTupleCidEnt *ent;
ReorderBufferTupleCidEnt *new_ent;
- bool found;
+ bool found;
/* be careful about padding */
memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
@@ -2813,7 +2815,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m",
path)));
- else if (readBytes == 0) /* EOF */
+ else if (readBytes == 0) /* EOF */
break;
else if (readBytes != sizeof(LogicalRewriteMappingData))
ereport(ERROR,
@@ -2884,8 +2886,8 @@ TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
static int
file_sort_by_lsn(const void *a_p, const void *b_p)
{
- RewriteMappingFile *a = *(RewriteMappingFile **)a_p;
- RewriteMappingFile *b = *(RewriteMappingFile **)b_p;
+ RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
+ RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
if (a->lsn < b->lsn)
return -1;
@@ -2912,19 +2914,20 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
mapping_dir = AllocateDir("pg_llog/mappings");
while ((mapping_de = ReadDir(mapping_dir, "pg_llog/mappings")) != NULL)
{
- Oid f_dboid;
- Oid f_relid;
- TransactionId f_mapped_xid;
- TransactionId f_create_xid;
- XLogRecPtr f_lsn;
- uint32 f_hi, f_lo;
+ Oid f_dboid;
+ Oid f_relid;
+ TransactionId f_mapped_xid;
+ TransactionId f_create_xid;
+ XLogRecPtr f_lsn;
+ uint32 f_hi,
+ f_lo;
RewriteMappingFile *f;
if (strcmp(mapping_de->d_name, ".") == 0 ||
strcmp(mapping_de->d_name, "..") == 0)
continue;
- /* Ignore files that aren't ours*/
+ /* Ignore files that aren't ours */
if (strncmp(mapping_de->d_name, "map-", 4) != 0)
continue;
@@ -2971,11 +2974,12 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
file_sort_by_lsn);
- for(off = 0; off < list_length(files); off++)
+ for (off = 0; off < list_length(files); off++)
{
RewriteMappingFile *f = files_a[off];
+
elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
- snapshot->subxip[0]);
+ snapshot->subxip[0]);
ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
pfree(f);
}
@@ -2995,7 +2999,7 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
ReorderBufferTupleCidEnt *ent;
ForkNumber forkno;
BlockNumber blockno;
- bool updated_mapping = false;
+ bool updated_mapping = false;
/* be careful about padding */
memset(&key, 0, sizeof(key));