aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/decode.c77
-rw-r--r--src/backend/replication/logical/logical.c135
-rw-r--r--src/backend/replication/logical/logicalfuncs.c25
-rw-r--r--src/backend/replication/logical/reorderbuffer.c82
-rw-r--r--src/backend/replication/logical/snapbuild.c138
5 files changed, 244 insertions, 213 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 414cfa95586..7b6114a2097 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -9,12 +9,12 @@
*
* NOTE:
* This basically tries to handle all low level xlog stuff for
- * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
- * specific record's struct is used to pass data along, but those just
- * happen to contain the right amount of data in a convenient
- * format. There isn't and shouldn't be much intelligence about the
- * contents of records in here except turning them into a more usable
- * format.
+ * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
+ * specific record's struct is used to pass data along, but those just
+ * happen to contain the right amount of data in a convenient
+ * format. There isn't and shouldn't be much intelligence about the
+ * contents of records in here except turning them into a more usable
+ * format.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -44,10 +44,10 @@
typedef struct XLogRecordBuffer
{
- XLogRecPtr origptr;
- XLogRecPtr endptr;
- XLogRecord record;
- char *record_data;
+ XLogRecPtr origptr;
+ XLogRecPtr endptr;
+ XLogRecord record;
+ char *record_data;
} XLogRecordBuffer;
/* RMGR Handlers */
@@ -63,10 +63,10 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msg);
+ TransactionId xid, Oid dboid,
+ TimestampTz commit_time,
+ int nsubxacts, TransactionId *sub_xids,
+ int ninval_msgs, SharedInvalidationMessage *msg);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
TransactionId xid, TransactionId *sub_xids, int nsubxacts);
@@ -91,10 +91,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
/* cast so we get a warning when new rmgrs are added */
switch ((RmgrIds) buf.record.xl_rmid)
{
- /*
- * Rmgrs we care about for logical decoding. Add new rmgrs in
- * rmgrlist.h's order.
- */
+ /*
+ * Rmgrs we care about for logical decoding. Add new rmgrs in
+ * rmgrlist.h's order.
+ */
case RM_XLOG_ID:
DecodeXLogOp(ctx, &buf);
break;
@@ -115,11 +115,11 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
DecodeHeapOp(ctx, &buf);
break;
- /*
- * Rmgrs irrelevant for logical decoding; they describe stuff not
- * represented in logical decoding. Add new rmgrs in rmgrlist.h's
- * order.
- */
+ /*
+ * Rmgrs irrelevant for logical decoding; they describe stuff not
+ * represented in logical decoding. Add new rmgrs in rmgrlist.h's
+ * order.
+ */
case RM_SMGR_ID:
case RM_CLOG_ID:
case RM_DBASE_ID:
@@ -149,13 +149,14 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
switch (info)
{
- /* this is also used in END_OF_RECOVERY checkpoints */
+ /* this is also used in END_OF_RECOVERY checkpoints */
case XLOG_CHECKPOINT_SHUTDOWN:
case XLOG_END_OF_RECOVERY:
SnapBuildSerializationPoint(builder, buf->origptr);
break;
case XLOG_CHECKPOINT_ONLINE:
+
/*
* a RUNNING_XACTS record will have been logged near to this, we
* can restart from there.
@@ -181,9 +182,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- SnapBuild *builder = ctx->snapshot_builder;
- ReorderBuffer *reorder = ctx->reorder;
- XLogRecord *r = &buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBuffer *reorder = ctx->reorder;
+ XLogRecord *r = &buf->record;
uint8 info = r->xl_info & ~XLR_INFO_MASK;
/* no point in doing anything yet, data could not be decoded anyway */
@@ -280,7 +281,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
int i;
TransactionId *sub_xid;
- xlrec = (xl_xact_assignment *) buf->record_data;
+ xlrec = (xl_xact_assignment *) buf->record_data;
sub_xid = &xlrec->xsub[0];
@@ -292,6 +293,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
}
case XLOG_XACT_PREPARE:
+
/*
* Currently decoding ignores PREPARE TRANSACTION and will just
* decode the transaction when the COMMIT PREPARED is sent or
@@ -321,7 +323,9 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_RUNNING_XACTS:
{
xl_running_xacts *running = (xl_running_xacts *) buf->record_data;
+
SnapBuildProcessRunningXacts(builder, buf->origptr, running);
+
/*
* Abort all transactions that we keep track of, that are
* older than the record's oldestRunningXid. This is the most
@@ -364,22 +368,25 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_HEAP2_NEW_CID:
{
xl_heap_new_cid *xlrec;
+
xlrec = (xl_heap_new_cid *) buf->record_data;
SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
break;
}
case XLOG_HEAP2_REWRITE:
+
/*
* Although these records only exist to serve the needs of logical
* decoding, all the work happens as part of crash or archive
* recovery, so we don't need to do anything here.
*/
break;
- /*
- * Everything else here is just low level physical stuff we're
- * not interested in.
- */
+
+ /*
+ * Everything else here is just low level physical stuff we're not
+ * interested in.
+ */
case XLOG_HEAP2_FREEZE_PAGE:
case XLOG_HEAP2_CLEAN:
case XLOG_HEAP2_CLEANUP_INFO:
@@ -429,6 +436,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_NEWPAGE:
+
/*
* This is only used in places like indexams and CLUSTER which
* don't contain changes relevant for logical replication.
@@ -436,6 +444,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_INPLACE:
+
/*
* Inplace updates are only ever performed on catalog tuples and
* can, per definition, not change tuple visibility. Since we
@@ -503,8 +512,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* There basically two reasons we might not be interested in this
* transaction:
* 1) We might not be interested in decoding transactions up to this
- * LSN. This can happen because we previously decoded it and now just
- * are restarting or if we haven't assembled a consistent snapshot yet.
+ * LSN. This can happen because we previously decoded it and now just
+ * are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
*
* We can't just use ReorderBufferAbort() here, because we need to execute
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d08b50da39..438a3fb152d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -8,21 +8,21 @@
* src/backend/replication/logical/logical.c
*
* NOTES
- * This file coordinates interaction between the various modules that
- * together provide logical decoding, primarily by providing so
- * called LogicalDecodingContexts. The goal is to encapsulate most of the
- * internal complexity for consumers of logical decoding, so they can
- * create and consume a changestream with a low amount of code. Builtin
- * consumers are the walsender and SQL SRF interface, but it's possible to
- * add further ones without changing core code, e.g. to consume changes in
- * a bgworker.
+ * This file coordinates interaction between the various modules that
+ * together provide logical decoding, primarily by providing so
+ * called LogicalDecodingContexts. The goal is to encapsulate most of the
+ * internal complexity for consumers of logical decoding, so they can
+ * create and consume a changestream with a low amount of code. Builtin
+ * consumers are the walsender and SQL SRF interface, but it's possible to
+ * add further ones without changing core code, e.g. to consume changes in
+ * a bgworker.
*
- * The idea is that a consumer provides three callbacks, one to read WAL,
- * one to prepare a data write, and a final one for actually writing since
- * their implementation depends on the type of consumer. Check
- * logicalfuncs.c for an example implementation of a fairly simple consumer
- * and a implementation of a WAL reading callback that's suitable for
- * simple consumers.
+ * The idea is that a consumer provides three callbacks, one to read WAL,
+ * one to prepare a data write, and a final one for actually writing since
+ * their implementation depends on the type of consumer. Check
+ * logicalfuncs.c for an example implementation of a fairly simple consumer
+ * and a implementation of a WAL reading callback that's suitable for
+ * simple consumers.
*-------------------------------------------------------------------------
*/
@@ -56,13 +56,13 @@ typedef struct LogicalErrorCallbackState
/* wrappers around output plugin callbacks */
static void output_plugin_error_callback(void *arg);
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
- bool is_init);
+ bool is_init);
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn);
+ XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- Relation relation, ReorderBufferChange *change);
+ Relation relation, ReorderBufferChange *change);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
@@ -90,18 +90,18 @@ CheckLogicalDecodingRequirements(void)
*
* There's basically three things missing to allow this:
* 1) We need to be able to correctly and quickly identify the timeline a
- * LSN belongs to
+ * LSN belongs to
* 2) We need to force hot_standby_feedback to be enabled at all times so
- * the primary cannot remove rows we need.
+ * the primary cannot remove rows we need.
* 3) support dropping replication slots referring to a database, in
- * dbase_redo. There can't be any active ones due to HS recovery
- * conflicts, so that should be relatively easy.
+ * dbase_redo. There can't be any active ones due to HS recovery
+ * conflicts, so that should be relatively easy.
* ----
*/
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("logical decoding cannot be used while in recovery")));
+ errmsg("logical decoding cannot be used while in recovery")));
}
/*
@@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options,
LogicalOutputPluginWriterWrite do_write)
{
ReplicationSlot *slot;
- MemoryContext context, old_context;
+ MemoryContext context,
+ old_context;
LogicalDecodingContext *ctx;
/* shorter lines... */
@@ -133,7 +134,10 @@ StartupDecodingContext(List *output_plugin_options,
ctx->context = context;
- /* (re-)load output plugins, so we detect a bad (removed) output plugin now. */
+ /*
+ * (re-)load output plugins, so we detect a bad (removed) output plugin
+ * now.
+ */
LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
/*
@@ -195,10 +199,10 @@ CreateInitDecodingContext(char *plugin,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
{
- TransactionId xmin_horizon = InvalidTransactionId;
+ TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
LogicalDecodingContext *ctx;
- MemoryContext old_context;
+ MemoryContext old_context;
/* shorter lines... */
slot = MyReplicationSlot;
@@ -219,8 +223,8 @@ CreateInitDecodingContext(char *plugin,
if (slot->data.database != MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" was not created in this database",
- NameStr(slot->data.name))));
+ errmsg("replication slot \"%s\" was not created in this database",
+ NameStr(slot->data.name))));
if (IsTransactionState() &&
GetTopTransactionIdIfAny() != InvalidTransactionId)
@@ -252,9 +256,9 @@ CreateInitDecodingContext(char *plugin,
*/
if (!RecoveryInProgress())
{
- XLogRecPtr flushptr;
+ XLogRecPtr flushptr;
- /* start at current insert position*/
+ /* start at current insert position */
slot->data.restart_lsn = GetXLogInsertRecPtr();
/* make sure we have enough information to start */
@@ -307,8 +311,8 @@ CreateInitDecodingContext(char *plugin,
LWLockRelease(ProcArrayLock);
/*
- * tell the snapshot builder to only assemble snapshot once reaching
- * the a running_xact's record with the respective xmin.
+ * tell the snapshot builder to only assemble snapshot once reaching the a
+ * running_xact's record with the respective xmin.
*/
xmin_horizon = slot->data.catalog_xmin;
@@ -316,7 +320,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- read_page, prepare_write, do_write);
+ read_page, prepare_write, do_write);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -352,7 +356,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
{
LogicalDecodingContext *ctx;
ReplicationSlot *slot;
- MemoryContext old_context;
+ MemoryContext old_context;
/* shorter lines... */
slot = MyReplicationSlot;
@@ -370,8 +374,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
if (slot->data.database != MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- (errmsg("replication slot \"%s\" was not created in this database",
- NameStr(slot->data.name)))));
+ (errmsg("replication slot \"%s\" was not created in this database",
+ NameStr(slot->data.name)))));
if (start_lsn == InvalidXLogRecPtr)
{
@@ -385,14 +389,14 @@ CreateDecodingContext(XLogRecPtr start_lsn,
* pretty common for a client to acknowledge a LSN it doesn't have to
* do anything for, and thus didn't store persistently, because the
* xlog records didn't result in anything relevant for logical
- * decoding. Clients have to be able to do that to support
- * synchronous replication.
+ * decoding. Clients have to be able to do that to support synchronous
+ * replication.
*/
start_lsn = slot->data.confirmed_flush;
elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding",
- (uint32)(start_lsn >> 32), (uint32)start_lsn,
- (uint32)(slot->data.confirmed_flush >> 32),
- (uint32)slot->data.confirmed_flush);
+ (uint32) (start_lsn >> 32), (uint32) start_lsn,
+ (uint32) (slot->data.confirmed_flush >> 32),
+ (uint32) slot->data.confirmed_flush);
}
ctx = StartupDecodingContext(output_plugin_options,
@@ -409,10 +413,10 @@ CreateDecodingContext(XLogRecPtr start_lsn,
(errmsg("starting logical decoding for slot %s",
NameStr(slot->data.name)),
errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X",
- (uint32)(slot->data.confirmed_flush >> 32),
- (uint32)slot->data.confirmed_flush,
- (uint32)(slot->data.restart_lsn >> 32),
- (uint32)slot->data.restart_lsn)));
+ (uint32) (slot->data.confirmed_flush >> 32),
+ (uint32) slot->data.confirmed_flush,
+ (uint32) (slot->data.restart_lsn >> 32),
+ (uint32) slot->data.restart_lsn)));
return ctx;
}
@@ -438,8 +442,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
startptr = ctx->slot->data.restart_lsn;
elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
- (uint32)(ctx->slot->data.restart_lsn >> 32),
- (uint32)ctx->slot->data.restart_lsn);
+ (uint32) (ctx->slot->data.restart_lsn >> 32),
+ (uint32) ctx->slot->data.restart_lsn);
/* Wait for a consistent starting point */
for (;;)
@@ -543,14 +547,15 @@ static void
output_plugin_error_callback(void *arg)
{
LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
+
/* not all callbacks have an associated LSN */
if (state->report_location != InvalidXLogRecPtr)
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name,
- (uint32)(state->report_location >> 32),
- (uint32)state->report_location);
+ (uint32) (state->report_location >> 32),
+ (uint32) state->report_location);
else
errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
NameStr(state->ctx->slot->data.name),
@@ -643,7 +648,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
static void
commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr commit_lsn)
+ XLogRecPtr commit_lsn)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
@@ -652,7 +657,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "commit";
- state.report_location = txn->final_lsn; /* beginning of commit record */
+ state.report_location = txn->final_lsn; /* beginning of commit record */
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
@@ -672,7 +677,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void
change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- Relation relation, ReorderBufferChange *change)
+ Relation relation, ReorderBufferChange *change)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
@@ -690,6 +695,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
+
/*
* report this change's lsn so replies from clients can give an up2date
* answer. This won't ever be enough (and shouldn't be!) to confirm
@@ -715,7 +721,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
void
LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
{
- bool updated_xmin = false;
+ bool updated_xmin = false;
ReplicationSlot *slot;
slot = MyReplicationSlot;
@@ -725,16 +731,17 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
SpinLockAcquire(&slot->mutex);
/*
- * don't overwrite if we already have a newer xmin. This can
- * happen if we restart decoding in a slot.
+ * don't overwrite if we already have a newer xmin. This can happen if we
+ * restart decoding in a slot.
*/
if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
{
}
+
/*
- * If the client has already confirmed up to this lsn, we directly
- * can mark this as accepted. This can happen if we restart
- * decoding in a slot.
+ * If the client has already confirmed up to this lsn, we directly can
+ * mark this as accepted. This can happen if we restart decoding in a
+ * slot.
*/
else if (current_lsn <= slot->data.confirmed_flush)
{
@@ -744,6 +751,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
/* our candidate can directly be used */
updated_xmin = true;
}
+
/*
* Only increase if the previous values have been applied, otherwise we
* might never end up updating if the receiver acks too slowly.
@@ -770,7 +778,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
void
LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
{
- bool updated_lsn = false;
+ bool updated_lsn = false;
ReplicationSlot *slot;
slot = MyReplicationSlot;
@@ -781,13 +789,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
SpinLockAcquire(&slot->mutex);
- /* don't overwrite if have a newer restart lsn*/
+ /* don't overwrite if have a newer restart lsn */
if (restart_lsn <= slot->data.restart_lsn)
{
}
+
/*
- * We might have already flushed far enough to directly accept this lsn, in
- * this case there is no need to check for existing candidate LSNs
+ * We might have already flushed far enough to directly accept this lsn,
+ * in this case there is no need to check for existing candidate LSNs
*/
else if (current_lsn <= slot->data.confirmed_flush)
{
@@ -797,6 +806,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
/* our candidate can directly be used */
updated_lsn = true;
}
+
/*
* Only increase if the previous values have been applied, otherwise we
* might never end up updating if the receiver acks too slowly. A missed
@@ -896,6 +906,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
}
+
/*
* Now the new xmin is safely on disk, we can let the global value
* advance. We do not take ProcArrayLock or similar since we only
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 5fa1848001d..2da6bb10b22 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -42,11 +42,12 @@
#include "storage/fd.h"
/* private date for writing out data */
-typedef struct DecodingOutputState {
+typedef struct DecodingOutputState
+{
Tuplestorestate *tupstore;
- TupleDesc tupdesc;
- bool binary_output;
- int64 returned_rows;
+ TupleDesc tupdesc;
+ bool binary_output;
+ int64 returned_rows;
} DecodingOutputState;
/*
@@ -91,7 +92,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
/* ick, but cstring_to_text_with_len works for bytea perfectly fine */
values[2] = PointerGetDatum(
- cstring_to_text_with_len(ctx->out->data, ctx->out->len));
+ cstring_to_text_with_len(ctx->out->data, ctx->out->len));
tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
p->returned_rows++;
@@ -412,7 +413,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
InvalidateSystemCaches();
while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) ||
- (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
+ (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal))
{
XLogRecord *record;
char *errm = NULL;
@@ -474,7 +475,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
Datum
pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false);
+ Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false);
+
return ret;
}
@@ -484,7 +486,8 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
Datum
pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false);
+ Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false);
+
return ret;
}
@@ -494,7 +497,8 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
Datum
pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true);
+ Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true);
+
return ret;
}
@@ -504,6 +508,7 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
Datum
pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true);
+ Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true);
+
return ret;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index a2b2adb1732..7f2bbca302e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -60,7 +60,7 @@
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
-#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
+#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
@@ -582,7 +582,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
*/
void
ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
- ReorderBufferChange *change)
+ ReorderBufferChange *change)
{
ReorderBufferTXN *txn;
@@ -1047,8 +1047,8 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Cleanup the tuplecids we stored for decoding catalog snapshot
- * access. They are always stored in the toplevel transaction.
+ * Cleanup the tuplecids we stored for decoding catalog snapshot access.
+ * They are always stored in the toplevel transaction.
*/
dlist_foreach_modify(iter, &txn->tuplecids)
{
@@ -1204,9 +1204,9 @@ ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
snap->subxip[i++] = txn->xid;
/*
- * nsubxcnt isn't decreased when subtransactions abort, so count
- * manually. Since it's an upper boundary it is safe to use it for the
- * allocation above.
+ * nsubxcnt isn't decreased when subtransactions abort, so count manually.
+ * Since it's an upper boundary it is safe to use it for the allocation
+ * above.
*/
snap->subxcnt = 1;
@@ -1262,10 +1262,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferIterTXNState *iterstate = NULL;
ReorderBufferChange *change;
- volatile CommandId command_id = FirstCommandId;
- volatile Snapshot snapshot_now = NULL;
- volatile bool txn_started = false;
- volatile bool subtxn_started = false;
+ volatile CommandId command_id = FirstCommandId;
+ volatile Snapshot snapshot_now = NULL;
+ volatile bool txn_started = false;
+ volatile bool subtxn_started = false;
txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
false);
@@ -1309,8 +1309,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
/*
* Decoding needs access to syscaches et al., which in turn use
- * heavyweight locks and such. Thus we need to have enough state around
- * to keep track of those. The easiest way is to simply use a
+ * heavyweight locks and such. Thus we need to have enough state
+ * around to keep track of those. The easiest way is to simply use a
* transaction internally. That also allows us to easily enforce that
* nothing writes to the database by checking for xid assignments.
*
@@ -1344,7 +1344,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
Assert(snapshot_now);
reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
- change->data.tp.relnode.relNode);
+ change->data.tp.relnode.relNode);
/*
* Catalog tuple without data, emitted while catalog was
@@ -1415,6 +1415,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferCopySnap(rb, change->data.snapshot,
txn, command_id);
}
+
/*
* Restored from disk, need to be careful not to double
* free. We could introduce refcounting for that, but for
@@ -1447,7 +1448,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
{
/* we don't use the global one anymore */
snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
- txn, command_id);
+ txn, command_id);
}
snapshot_now->curcid = command_id;
@@ -1586,7 +1587,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
*/
dlist_foreach_modify(it, &rb->toplevel_by_lsn)
{
- ReorderBufferTXN * txn;
+ ReorderBufferTXN *txn;
txn = dlist_container(ReorderBufferTXN, node, it.cur);
@@ -1998,7 +1999,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_DELETE:
{
char *data;
- ReorderBufferTupleBuf *oldtup, *newtup;
+ ReorderBufferTupleBuf *oldtup,
+ *newtup;
Size oldlen = 0;
Size newlen = 0;
@@ -2007,12 +2009,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldtup)
oldlen = offsetof(ReorderBufferTupleBuf, data)
- + oldtup->tuple.t_len
+ +oldtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
if (newtup)
newlen = offsetof(ReorderBufferTupleBuf, data)
- + newtup->tuple.t_len
+ +newtup->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
sz += oldlen;
@@ -2188,7 +2190,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
else if (readBytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from reorderbuffer spill file: %m")));
+ errmsg("could not read from reorderbuffer spill file: %m")));
else if (readBytes != sizeof(ReorderBufferDiskChange))
ereport(ERROR,
(errcode_for_file_access(),
@@ -2199,7 +2201,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
ReorderBufferSerializeReserve(rb,
- sizeof(ReorderBufferDiskChange) + ondisk->size);
+ sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
@@ -2208,13 +2210,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (readBytes < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from reorderbuffer spill file: %m")));
+ errmsg("could not read from reorderbuffer spill file: %m")));
else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
readBytes,
- (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
+ (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
/*
* ok, read a full change from disk, now restore it into proper
@@ -2364,7 +2366,7 @@ StartupReorderBuffer(void)
logical_dir = AllocateDir("pg_replslot");
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
{
- struct stat statbuf;
+ struct stat statbuf;
char path[MAXPGPATH];
if (strcmp(logical_de->d_name, ".") == 0 ||
@@ -2620,7 +2622,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
cchange = dlist_container(ReorderBufferChange, node, it.cur);
ctup = cchange->data.tp.newtuple;
chunk = DatumGetPointer(
- fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
+ fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
Assert(!isnull);
Assert(!VARATT_IS_EXTERNAL(chunk));
@@ -2800,7 +2802,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
ReorderBufferTupleCidKey key;
ReorderBufferTupleCidEnt *ent;
ReorderBufferTupleCidEnt *new_ent;
- bool found;
+ bool found;
/* be careful about padding */
memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
@@ -2813,7 +2815,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m",
path)));
- else if (readBytes == 0) /* EOF */
+ else if (readBytes == 0) /* EOF */
break;
else if (readBytes != sizeof(LogicalRewriteMappingData))
ereport(ERROR,
@@ -2884,8 +2886,8 @@ TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
static int
file_sort_by_lsn(const void *a_p, const void *b_p)
{
- RewriteMappingFile *a = *(RewriteMappingFile **)a_p;
- RewriteMappingFile *b = *(RewriteMappingFile **)b_p;
+ RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
+ RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
if (a->lsn < b->lsn)
return -1;
@@ -2912,19 +2914,20 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
mapping_dir = AllocateDir("pg_llog/mappings");
while ((mapping_de = ReadDir(mapping_dir, "pg_llog/mappings")) != NULL)
{
- Oid f_dboid;
- Oid f_relid;
- TransactionId f_mapped_xid;
- TransactionId f_create_xid;
- XLogRecPtr f_lsn;
- uint32 f_hi, f_lo;
+ Oid f_dboid;
+ Oid f_relid;
+ TransactionId f_mapped_xid;
+ TransactionId f_create_xid;
+ XLogRecPtr f_lsn;
+ uint32 f_hi,
+ f_lo;
RewriteMappingFile *f;
if (strcmp(mapping_de->d_name, ".") == 0 ||
strcmp(mapping_de->d_name, "..") == 0)
continue;
- /* Ignore files that aren't ours*/
+ /* Ignore files that aren't ours */
if (strncmp(mapping_de->d_name, "map-", 4) != 0)
continue;
@@ -2971,11 +2974,12 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
file_sort_by_lsn);
- for(off = 0; off < list_length(files); off++)
+ for (off = 0; off < list_length(files); off++)
{
RewriteMappingFile *f = files_a[off];
+
elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
- snapshot->subxip[0]);
+ snapshot->subxip[0]);
ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
pfree(f);
}
@@ -2995,7 +2999,7 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
ReorderBufferTupleCidEnt *ent;
ForkNumber forkno;
BlockNumber blockno;
- bool updated_mapping = false;
+ bool updated_mapping = false;
/* be careful about padding */
memset(&key, 0, sizeof(key));
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 36034dbec9d..cb45f906fc1 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -57,27 +57,27 @@
*
* The snapbuild machinery is starting up in several stages, as illustrated
* by the following graph:
- * +-------------------------+
- * +----|SNAPBUILD_START |-------------+
- * | +-------------------------+ |
- * | | |
- * | | |
- * | running_xacts with running xacts |
- * | | |
- * | | |
- * | v |
- * | +-------------------------+ v
- * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
- * | +-------------------------+ |
- * running_xacts | saved snapshot
- * with zero xacts | at running_xacts's lsn
- * | | |
- * | all running toplevel TXNs finished |
- * | | |
- * | v |
- * | +-------------------------+ |
- * +--->|SNAPBUILD_CONSISTENT |<------------+
- * +-------------------------+
+ * +-------------------------+
+ * +----|SNAPBUILD_START |-------------+
+ * | +-------------------------+ |
+ * | | |
+ * | | |
+ * | running_xacts with running xacts |
+ * | | |
+ * | | |
+ * | v |
+ * | +-------------------------+ v
+ * | |SNAPBUILD_FULL_SNAPSHOT |------------>|
+ * | +-------------------------+ |
+ * running_xacts | saved snapshot
+ * with zero xacts | at running_xacts's lsn
+ * | | |
+ * | all running toplevel TXNs finished |
+ * | | |
+ * | v |
+ * | +-------------------------+ |
+ * +--->|SNAPBUILD_CONSISTENT |<------------+
+ * +-------------------------+
*
* Initially the machinery is in the START stage. When a xl_running_xacts
* record is read that is sufficiently new (above the safe xmin horizon),
@@ -184,7 +184,7 @@ struct SnapBuild
* Information about initially running transactions
*
* When we start building a snapshot there already may be transactions in
- * progress. Those are stored in running.xip. We don't have enough
+ * progress. Those are stored in running.xip. We don't have enough
* information about those to decode their contents, so until they are
* finished (xcnt=0) we cannot switch to a CONSISTENT state.
*/
@@ -244,7 +244,7 @@ struct SnapBuild
* removes knowledge about the previously used resowner, so we save it here.
*/
ResourceOwner SavedResourceOwnerDuringExport = NULL;
-bool ExportInProgress = false;
+bool ExportInProgress = false;
/* transaction state manipulation functions */
static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
@@ -496,7 +496,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid)
snapshot->copied = false;
snapshot->curcid = FirstCommandId;
snapshot->active_count = 0;
- snapshot->regd_count = 1; /* mark as registered so nobody frees it */
+ snapshot->regd_count = 1; /* mark as registered so nobody frees it */
return snapshot;
}
@@ -635,7 +635,7 @@ SnapBuildClearExportedSnapshot()
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- bool is_old_tx;
+ bool is_old_tx;
/*
* We can't handle data in transactions if we haven't built a snapshot
@@ -692,10 +692,10 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
CommandId cid;
/*
- * we only log new_cid's if a catalog tuple was modified, so mark
- * the transaction as containing catalog modifications
+ * we only log new_cid's if a catalog tuple was modified, so mark the
+ * transaction as containing catalog modifications
*/
- ReorderBufferXidSetCatalogChanges(builder->reorder, xid,lsn);
+ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
xlrec->target.node, xlrec->target.tid,
@@ -712,7 +712,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
cid = xlrec->cmin;
else
{
- cid = InvalidCommandId; /* silence compiler */
+ cid = InvalidCommandId; /* silence compiler */
elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
}
@@ -818,7 +818,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
(uint32) builder->committed.xcnt_space);
builder->committed.xip = repalloc(builder->committed.xip,
- builder->committed.xcnt_space * sizeof(TransactionId));
+ builder->committed.xcnt_space * sizeof(TransactionId));
}
/*
@@ -900,10 +900,10 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
* so our incrementaly built snapshot now is consistent.
*/
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("xid %u finished, no running transactions anymore",
- xid)));
+ (errmsg("logical decoding found consistent point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("xid %u finished, no running transactions anymore",
+ xid)));
builder->state = SNAPBUILD_CONSISTENT;
}
}
@@ -1170,15 +1170,16 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
*/
if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
+
/*
* No in-progress transaction, can reuse the last serialized snapshot if
* we have one.
*/
else if (txn == NULL &&
- builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
+ builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
builder->last_serialized_snapshot != InvalidXLogRecPtr)
LogicalIncreaseRestartDecodingForSlot(lsn,
- builder->last_serialized_snapshot);
+ builder->last_serialized_snapshot);
}
@@ -1199,23 +1200,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* the currently running transactions. There are several ways to do that:
*
* a) There were no running transactions when the xl_running_xacts record
- * was inserted, jump to CONSISTENT immediately. We might find such a
- * state we were waiting for b) and c).
+ * was inserted, jump to CONSISTENT immediately. We might find such a
+ * state we were waiting for b) and c).
*
* b) Wait for all toplevel transactions that were running to end. We
- * simply track the number of in-progress toplevel transactions and
- * lower it whenever one commits or aborts. When that number
- * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
- * to CONSISTENT.
+ * simply track the number of in-progress toplevel transactions and
+ * lower it whenever one commits or aborts. When that number
+ * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT
+ * to CONSISTENT.
* NB: We need to search running.xip when seeing a transaction's end to
- * make sure it's a toplevel transaction and it's been one of the
- * intially running ones.
+ * make sure it's a toplevel transaction and it's been one of the
+ * intially running ones.
* Interestingly, in contrast to HS, this allows us not to care about
* subtransactions - and by extension suboverflowed xl_running_xacts -
* at all.
*
* c) This (in a previous run) or another decoding slot serialized a
- * snapshot to disk that we can use.
+ * snapshot to disk that we can use.
* ---
*/
@@ -1231,7 +1232,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
(errmsg("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
(uint32) (lsn >> 32), (uint32) lsn),
errdetail("initial xmin horizon of %u vs the snapshot's %u",
- builder->initial_xmin_horizon, running->oldestRunningXid)));
+ builder->initial_xmin_horizon, running->oldestRunningXid)));
return true;
}
@@ -1263,7 +1264,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
+ (uint32) (lsn >> 32), (uint32) lsn),
errdetail("running xacts with xcnt == 0")));
return false;
@@ -1274,15 +1275,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
/* there won't be any state to cleanup */
return false;
}
+
/*
* b) first encounter of a useable xl_running_xacts record. If we had
- * found one earlier we would either track running transactions
- * (i.e. builder->running.xcnt != 0) or be consistent (this function
- * wouldn't get called).
+ * found one earlier we would either track running transactions (i.e.
+ * builder->running.xcnt != 0) or be consistent (this function wouldn't
+ * get called).
*/
else if (!builder->running.xcnt)
{
- int off;
+ int off;
/*
* We only care about toplevel xids as those are the ones we
@@ -1302,7 +1304,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->running.xcnt_space = running->xcnt;
builder->running.xip =
MemoryContextAlloc(builder->context,
- builder->running.xcnt * sizeof(TransactionId));
+ builder->running.xcnt * sizeof(TransactionId));
memcpy(builder->running.xip, running->xids,
builder->running.xcnt * sizeof(TransactionId));
@@ -1320,9 +1322,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->state = SNAPBUILD_FULL_SNAPSHOT;
ereport(LOG,
- (errmsg("logical decoding found initial starting point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
- errdetail("%u xacts need to finish", (uint32) builder->running.xcnt)));
+ (errmsg("logical decoding found initial starting point at %X/%X",
+ (uint32) (lsn >> 32), (uint32) lsn),
+ errdetail("%u xacts need to finish", (uint32) builder->running.xcnt)));
/*
* Iterate through all xids, wait for them to finish.
@@ -1331,7 +1333,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* isolationtester to notice that we're currently waiting for
* something.
*/
- for(off = 0; off < builder->running.xcnt; off++)
+ for (off = 0; off < builder->running.xcnt; off++)
{
TransactionId xid = builder->running.xip[off];
@@ -1471,9 +1473,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
* but remember location, so we don't need to read old data again.
*
* To be sure it has been synced to disk after the rename() from the
- * tempfile filename to the real filename, we just repeat the
- * fsync. That ought to be cheap because in most scenarios it should
- * already be safely on disk.
+ * tempfile filename to the real filename, we just repeat the fsync.
+ * That ought to be cheap because in most scenarios it should already
+ * be safely on disk.
*/
fsync_fname(path, false);
fsync_fname("pg_llog/snapshots", true);
@@ -1504,7 +1506,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
if (unlink(tmppath) != 0 && errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not unlink file \"%s\": %m", path)));
+ errmsg("could not unlink file \"%s\": %m", path)));
needed_length = sizeof(SnapBuildOnDisk) +
sizeof(TransactionId) * builder->running.xcnt_space +
@@ -1518,7 +1520,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
INIT_CRC32(ondisk->checksum);
COMP_CRC32(ondisk->checksum,
((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
- SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+ SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
ondisk_c += sizeof(SnapBuildOnDisk);
memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
@@ -1597,8 +1599,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
fsync_fname("pg_llog/snapshots", true);
/*
- * Now there's no way we can loose the dumped state anymore, remember
- * this as a serialization point.
+ * Now there's no way we can loose the dumped state anymore, remember this
+ * as a serialization point.
*/
builder->last_serialized_snapshot = lsn;
@@ -1673,7 +1675,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
INIT_CRC32(checksum);
COMP_CRC32(checksum,
((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
- SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
+ SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
@@ -1781,7 +1783,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
ereport(LOG,
(errmsg("logical decoding found consistent point at %X/%X",
- (uint32)(lsn >> 32), (uint32)lsn),
+ (uint32) (lsn >> 32), (uint32) lsn),
errdetail("found initial snapshot in snapbuild file")));
return true;
@@ -1829,7 +1831,7 @@ CheckPointSnapBuild(void)
uint32 hi;
uint32 lo;
XLogRecPtr lsn;
- struct stat statbuf;
+ struct stat statbuf;
if (strcmp(snap_de->d_name, ".") == 0 ||
strcmp(snap_de->d_name, "..") == 0)
@@ -1846,8 +1848,8 @@ CheckPointSnapBuild(void)
/*
* temporary filenames from SnapBuildSerialize() include the LSN and
* everything but are postfixed by .$pid.tmp. We can just remove them
- * the same as other files because there can be none that are currently
- * being written that are older than cutoff.
+ * the same as other files because there can be none that are
+ * currently being written that are older than cutoff.
*
* We just log a message if a file doesn't fit the pattern, it's
* probably some editors lock/state file or similar...