aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c1429
1 files changed, 1429 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
new file mode 100644
index 00000000000..7d86736444b
--- /dev/null
+++ b/src/backend/replication/logical/worker.c
@@ -0,0 +1,1429 @@
+/*-------------------------------------------------------------------------
+ * worker.c
+ * PostgreSQL logical replication worker (apply)
+ *
+ * Copyright (c) 2012-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/worker.c
+ *
+ * NOTES
+ * This file contains the worker which applies logical changes as they come
+ * from remote logical replication stream.
+ *
+ * The main worker (apply) is started by logical replication worker
+ * launcher for every enabled subscription in a database. It uses
+ * walsender protocol to communicate with publisher.
+ *
+ * The apply worker may spawn additional workers (sync) for initial data
+ * synchronization of tables.
+ *
+ * This module includes server facing code and shares libpqwalreceiver
+ * module with walreceiver for providing the libpq specific functionality.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "funcapi.h"
+
+#include "access/xact.h"
+#include "access/xlog_internal.h"
+
+#include "catalog/namespace.h"
+#include "catalog/pg_subscription.h"
+
+#include "commands/trigger.h"
+
+#include "executor/executor.h"
+#include "executor/nodeModifyTable.h"
+
+#include "libpq/pqformat.h"
+#include "libpq/pqsignal.h"
+
+#include "mb/pg_wchar.h"
+
+#include "nodes/makefuncs.h"
+
+#include "optimizer/planner.h"
+
+#include "parser/parse_relation.h"
+
+#include "postmaster/bgworker.h"
+#include "postmaster/postmaster.h"
+
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
+#include "replication/reorderbuffer.h"
+#include "replication/origin.h"
+#include "replication/snapbuild.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+
+#include "rewrite/rewriteHandler.h"
+
+#include "storage/bufmgr.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+
+#include "utils/builtins.h"
+#include "utils/catcache.h"
+#include "utils/datum.h"
+#include "utils/fmgroids.h"
+#include "utils/guc.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/timeout.h"
+#include "utils/tqual.h"
+#include "utils/syscache.h"
+
+#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
+
+typedef struct FlushPosition
+{
+ dlist_node node;
+ XLogRecPtr local_end;
+ XLogRecPtr remote_end;
+} FlushPosition;
+
+static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
+
+typedef struct SlotErrCallbackArg
+{
+ LogicalRepRelation *rel;
+ int attnum;
+} SlotErrCallbackArg;
+
+static MemoryContext ApplyContext = NULL;
+static MemoryContext ApplyCacheContext = NULL;
+
+WalReceiverConn *wrconn = NULL;
+
+Subscription *MySubscription = NULL;
+bool MySubscriptionValid = false;
+
+bool in_remote_transaction = false;
+
+static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+
+static void store_flush_position(XLogRecPtr remote_lsn);
+
+static void reread_subscription(void);
+
+/*
+ * Make sure that we started local transaction.
+ *
+ * Also switches to ApplyContext as necessary.
+ */
+static bool
+ensure_transaction(void)
+{
+ if (IsTransactionState())
+ {
+ if (CurrentMemoryContext != ApplyContext)
+ MemoryContextSwitchTo(ApplyContext);
+ return false;
+ }
+
+ StartTransactionCommand();
+
+ if (!MySubscriptionValid)
+ reread_subscription();
+
+ MemoryContextSwitchTo(ApplyContext);
+ return true;
+}
+
+
+/*
+ * Executor state preparation for evaluation of constraint expressions,
+ * indexes and triggers.
+ *
+ * This is based on similar code in copy.c
+ */
+static EState *
+create_estate_for_relation(LogicalRepRelMapEntry *rel)
+{
+ EState *estate;
+ ResultRelInfo *resultRelInfo;
+ RangeTblEntry *rte;
+
+ estate = CreateExecutorState();
+
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel->localrel);
+ rte->relkind = rel->localrel->rd_rel->relkind;
+ estate->es_range_table = list_make1(rte);
+
+ resultRelInfo = makeNode(ResultRelInfo);
+ InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
+
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+
+ /* Triggers might need a slot */
+ if (resultRelInfo->ri_TrigDesc)
+ estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
+
+ return estate;
+}
+
+/*
+ * Executes default values for columns for which we can't map to remote
+ * relation columns.
+ *
+ * This allows us to support tables which have more columns on the downstream
+ * than on the upstream.
+ */
+static void
+slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
+ TupleTableSlot *slot)
+{
+ TupleDesc desc = RelationGetDescr(rel->localrel);
+ int num_phys_attrs = desc->natts;
+ int i;
+ int attnum,
+ num_defaults = 0;
+ int *defmap;
+ ExprState **defexprs;
+ ExprContext *econtext;
+
+ econtext = GetPerTupleExprContext(estate);
+
+ /* We got all the data via replication, no need to evaluate anything. */
+ if (num_phys_attrs == rel->remoterel.natts)
+ return;
+
+ defmap = (int *) palloc(num_phys_attrs * sizeof(int));
+ defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
+
+ for (attnum = 0; attnum < num_phys_attrs; attnum++)
+ {
+ Expr *defexpr;
+
+ if (desc->attrs[attnum]->attisdropped)
+ continue;
+
+ if (rel->attrmap[attnum] >= 0)
+ continue;
+
+ defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
+
+ if (defexpr != NULL)
+ {
+ /* Run the expression through planner */
+ defexpr = expression_planner(defexpr);
+
+ /* Initialize executable expression in copycontext */
+ defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
+ defmap[num_defaults] = attnum;
+ num_defaults++;
+ }
+
+ }
+
+ for (i = 0; i < num_defaults; i++)
+ slot->tts_values[defmap[i]] =
+ ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
+}
+
+/*
+ * Error callback to give more context info about type conversion failure.
+ */
+static void
+slot_store_error_callback(void *arg)
+{
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ Oid remotetypoid,
+ localtypoid;
+
+ if (errarg->attnum < 0)
+ return;
+
+ remotetypoid = errarg->rel->atttyps[errarg->attnum];
+ localtypoid = logicalrep_typmap_getid(remotetypoid);
+ errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
+ "remote type %s, local type %s",
+ errarg->rel->nspname, errarg->rel->relname,
+ errarg->rel->attnames[errarg->attnum],
+ format_type_be(remotetypoid),
+ format_type_be(localtypoid));
+}
+
+/*
+ * Store data in C string form into slot.
+ * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
+ * use better.
+ */
+static void
+slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+ char **values)
+{
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
+
+ ExecClearTuple(slot);
+
+ /* Push callback + info on the error context stack */
+ errarg.rel = &rel->remoterel;
+ errarg.attnum = -1;
+ errcallback.callback = slot_store_error_callback;
+ errcallback.arg = (void *) &errarg;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Call the "in" function for each non-dropped attribute */
+ for (i = 0; i < natts; i++)
+ {
+ Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
+ int remoteattnum = rel->attrmap[i];
+
+ if (!att->attisdropped && remoteattnum >= 0 &&
+ values[remoteattnum] != NULL)
+ {
+ Oid typinput;
+ Oid typioparam;
+
+ errarg.attnum = remoteattnum;
+
+ getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+ slot->tts_values[i] = OidInputFunctionCall(typinput,
+ values[remoteattnum],
+ typioparam,
+ att->atttypmod);
+ slot->tts_isnull[i] = false;
+ }
+ else
+ {
+ /*
+ * We assign NULL to dropped attributes, NULL values, and missing
+ * values (missing values should be later filled using
+ * slot_fill_defaults).
+ */
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
+ }
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ ExecStoreVirtualTuple(slot);
+}
+
+/*
+ * Modify slot with user data provided as C strigs.
+ * This is somewhat similar to heap_modify_tuple but also calls the type
+ * input fuction on the user data as the input is the text representation
+ * of the types.
+ */
+static void
+slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+ char **values, bool *replaces)
+{
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
+
+ slot_getallattrs(slot);
+ ExecClearTuple(slot);
+
+ /* Push callback + info on the error context stack */
+ errarg.rel = &rel->remoterel;
+ errarg.attnum = -1;
+ errcallback.callback = slot_store_error_callback;
+ errcallback.arg = (void *) &errarg;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Call the "in" function for each replaced attribute */
+ for (i = 0; i < natts; i++)
+ {
+ Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
+ int remoteattnum = rel->attrmap[i];
+
+ if (remoteattnum >= 0 && !replaces[remoteattnum])
+ continue;
+
+ if (remoteattnum >= 0 && values[remoteattnum] != NULL)
+ {
+ Oid typinput;
+ Oid typioparam;
+
+ errarg.attnum = remoteattnum;
+
+ getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+ slot->tts_values[i] = OidInputFunctionCall(typinput, values[i],
+ typioparam,
+ att->atttypmod);
+ slot->tts_isnull[i] = false;
+ }
+ else
+ {
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
+ }
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ ExecStoreVirtualTuple(slot);
+}
+
+/*
+ * Handle BEGIN message.
+ */
+static void
+apply_handle_begin(StringInfo s)
+{
+ LogicalRepBeginData begin_data;
+
+ logicalrep_read_begin(s, &begin_data);
+
+ replorigin_session_origin_timestamp = begin_data.committime;
+ replorigin_session_origin_lsn = begin_data.final_lsn;
+
+ in_remote_transaction = true;
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle COMMIT message.
+ *
+ * TODO, support tracking of multiple origins
+ */
+static void
+apply_handle_commit(StringInfo s)
+{
+ LogicalRepCommitData commit_data;
+
+ logicalrep_read_commit(s, &commit_data);
+
+ Assert(commit_data.commit_lsn == replorigin_session_origin_lsn);
+ Assert(commit_data.committime == replorigin_session_origin_timestamp);
+
+ if (IsTransactionState())
+ {
+ CommitTransactionCommand();
+
+ store_flush_position(commit_data.end_lsn);
+ }
+
+ in_remote_transaction = false;
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle ORIGIN message.
+ *
+ * TODO, support tracking of multiple origins
+ */
+static void
+apply_handle_origin(StringInfo s)
+{
+ /*
+ * ORIGIN message can only come inside remote transaction and before
+ * any actual writes.
+ */
+ if (!in_remote_transaction || IsTransactionState())
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("ORIGIN message sent out of order")));
+}
+
+/*
+ * Handle RELATION message.
+ *
+ * Note we don't do validation against local schema here. The validation
+ * against local schema is postponed until first change for given relation
+ * comes as we only care about it when applying changes for it anyway and we
+ * do less locking this way.
+ */
+static void
+apply_handle_relation(StringInfo s)
+{
+ LogicalRepRelation *rel;
+
+ rel = logicalrep_read_rel(s);
+ logicalrep_relmap_update(rel);
+}
+
+/*
+ * Handle TYPE message.
+ *
+ * Note we don't do local mapping here, that's done when the type is
+ * actually used.
+ */
+static void
+apply_handle_type(StringInfo s)
+{
+ LogicalRepTyp typ;
+
+ logicalrep_read_typ(s, &typ);
+ logicalrep_typmap_update(&typ);
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+static Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+ Oid idxoid;
+
+ idxoid = RelationGetReplicaIndex(rel);
+
+ if (!OidIsValid(idxoid))
+ idxoid = RelationGetPrimaryKeyIndex(rel);
+
+ return idxoid;
+}
+
+/*
+ * Handle INSERT message.
+ */
+static void
+apply_handle_insert(StringInfo s)
+{
+ LogicalRepRelMapEntry *rel;
+ LogicalRepTupleData newtup;
+ LogicalRepRelId relid;
+ EState *estate;
+ TupleTableSlot *remoteslot;
+ MemoryContext oldctx;
+
+ ensure_transaction();
+
+ relid = logicalrep_read_insert(s, &newtup);
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+ /* Initialize the executor state. */
+ estate = create_estate_for_relation(rel);
+ remoteslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
+
+ /* Process and store remote tuple in the slot */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_store_cstrings(remoteslot, rel, newtup.values);
+ slot_fill_defaults(rel, estate, remoteslot);
+ MemoryContextSwitchTo(oldctx);
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+ ExecOpenIndices(estate->es_result_relation_info, false);
+
+ /* Do the insert. */
+ ExecSimpleRelationInsert(estate, remoteslot);
+
+ /* Cleanup. */
+ ExecCloseIndices(estate->es_result_relation_info);
+ PopActiveSnapshot();
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ CommandCounterIncrement();
+}
+
+/*
+ * Check if the logical replication relation is updatable and throw
+ * appropriate error if it isn't.
+ */
+static void
+check_relation_updatable(LogicalRepRelMapEntry *rel)
+{
+ /* Updatable, no error. */
+ if (rel->updatable)
+ return;
+
+ /*
+ * We are in error mode so it's fine this is somewhat slow.
+ * It's better to give user correct error.
+ */
+ if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publisher does not send replica identity column "
+ "expected by the logical replication target relation \"%s.%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname)));
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" has "
+ "neither REPLICA IDENTIY index nor PRIMARY "
+ "KEY and published relation does not have "
+ "REPLICA IDENTITY FULL",
+ rel->remoterel.nspname, rel->remoterel.relname)));
+}
+
+/*
+ * Handle UPDATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_update(StringInfo s)
+{
+ LogicalRepRelMapEntry *rel;
+ LogicalRepRelId relid;
+ Oid idxoid;
+ EState *estate;
+ EPQState epqstate;
+ LogicalRepTupleData oldtup;
+ LogicalRepTupleData newtup;
+ bool has_oldtup;
+ TupleTableSlot *localslot;
+ TupleTableSlot *remoteslot;
+ bool found;
+ MemoryContext oldctx;
+
+ ensure_transaction();
+
+ relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
+ &newtup);
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+ /* Check if we can do the update. */
+ check_relation_updatable(rel);
+
+ /* Initialize the executor state. */
+ estate = create_estate_for_relation(rel);
+ remoteslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
+ localslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+ ExecOpenIndices(estate->es_result_relation_info, false);
+
+ /* Build the search tuple. */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_store_cstrings(remoteslot, rel,
+ has_oldtup ? oldtup.values : newtup.values);
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Try to find tuple using either replica identity index, primary key
+ * or if needed, sequential scan.
+ */
+ idxoid = GetRelationIdentityOrPK(rel->localrel);
+ Assert(OidIsValid(idxoid) ||
+ (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
+
+ if (OidIsValid(idxoid))
+ found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
+ LockTupleExclusive,
+ remoteslot, localslot);
+ else
+ found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
+ remoteslot, localslot);
+
+ ExecClearTuple(remoteslot);
+
+ /*
+ * Tuple found.
+ *
+ * Note this will fail if there are other conflicting unique indexes.
+ */
+ if (found)
+ {
+ /* Process and store remote tuple in the slot */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false);
+ slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
+ MemoryContextSwitchTo(oldctx);
+
+ EvalPlanQualSetSlot(&epqstate, remoteslot);
+
+ /* Do the actual update. */
+ ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
+ }
+ else
+ {
+ /*
+ * The tuple to be updated could not be found.
+ *
+ * TODO what to do here, change the log level to LOG perhaps?
+ */
+ elog(DEBUG1,
+ "logical replication did not find row for update "
+ "in replication target relation \"%s\"",
+ RelationGetRelationName(rel->localrel));
+ }
+
+ /* Cleanup. */
+ ExecCloseIndices(estate->es_result_relation_info);
+ PopActiveSnapshot();
+ EvalPlanQualEnd(&epqstate);
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ CommandCounterIncrement();
+}
+
+/*
+ * Handle DELETE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_delete(StringInfo s)
+{
+ LogicalRepRelMapEntry *rel;
+ LogicalRepTupleData oldtup;
+ LogicalRepRelId relid;
+ Oid idxoid;
+ EState *estate;
+ EPQState epqstate;
+ TupleTableSlot *remoteslot;
+ TupleTableSlot *localslot;
+ bool found;
+ MemoryContext oldctx;
+
+ ensure_transaction();
+
+ relid = logicalrep_read_delete(s, &oldtup);
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+ /* Check if we can do the delete. */
+ check_relation_updatable(rel);
+
+ /* Initialize the executor state. */
+ estate = create_estate_for_relation(rel);
+ remoteslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel));
+ localslot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel));
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+ ExecOpenIndices(estate->es_result_relation_info, false);
+
+ /* Find the tuple using the replica identity index. */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_store_cstrings(remoteslot, rel, oldtup.values);
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Try to find tuple using either replica identity index, primary key
+ * or if needed, sequential scan.
+ */
+ idxoid = GetRelationIdentityOrPK(rel->localrel);
+ Assert(OidIsValid(idxoid) ||
+ (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
+
+ if (OidIsValid(idxoid))
+ found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
+ LockTupleExclusive,
+ remoteslot, localslot);
+ else
+ found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
+ remoteslot, localslot);
+ /* If found delete it. */
+ if (found)
+ {
+ EvalPlanQualSetSlot(&epqstate, localslot);
+
+ /* Do the actual delete. */
+ ExecSimpleRelationDelete(estate, &epqstate, localslot);
+ }
+ else
+ {
+ /* The tuple to be deleted could not be found.*/
+ ereport(DEBUG1,
+ (errmsg("logical replication could not find row for delete "
+ "in replication target %s",
+ RelationGetRelationName(rel->localrel))));
+ }
+
+ /* Cleanup. */
+ ExecCloseIndices(estate->es_result_relation_info);
+ PopActiveSnapshot();
+ EvalPlanQualEnd(&epqstate);
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ CommandCounterIncrement();
+}
+
+
+/*
+ * Logical replication protocol message dispatcher.
+ */
+static void
+apply_dispatch(StringInfo s)
+{
+ char action = pq_getmsgbyte(s);
+
+ switch (action)
+ {
+ /* BEGIN */
+ case 'B':
+ apply_handle_begin(s);
+ break;
+ /* COMMIT */
+ case 'C':
+ apply_handle_commit(s);
+ break;
+ /* INSERT */
+ case 'I':
+ apply_handle_insert(s);
+ break;
+ /* UPDATE */
+ case 'U':
+ apply_handle_update(s);
+ break;
+ /* DELETE */
+ case 'D':
+ apply_handle_delete(s);
+ break;
+ /* RELATION */
+ case 'R':
+ apply_handle_relation(s);
+ break;
+ /* TYPE */
+ case 'Y':
+ apply_handle_type(s);
+ break;
+ /* ORIGIN */
+ case 'O':
+ apply_handle_origin(s);
+ break;
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid logical replication message type %c", action)));
+ }
+}
+
+/*
+ * Figure out which write/flush positions to report to the walsender process.
+ *
+ * We can't simply report back the last LSN the walsender sent us because the
+ * local transaction might not yet be flushed to disk locally. Instead we
+ * build a list that associates local with remote LSNs for every commit. When
+ * reporting back the flush position to the sender we iterate that list and
+ * check which entries on it are already locally flushed. Those we can report
+ * as having been flushed.
+ *
+ * The have_pending_txes is true if there are outstanding transactions that
+ * need to be flushed.
+ */
+static void
+get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
+ bool *have_pending_txes)
+{
+ dlist_mutable_iter iter;
+ XLogRecPtr local_flush = GetFlushRecPtr();
+
+ *write = InvalidXLogRecPtr;
+ *flush = InvalidXLogRecPtr;
+
+ dlist_foreach_modify(iter, &lsn_mapping)
+ {
+ FlushPosition *pos =
+ dlist_container(FlushPosition, node, iter.cur);
+
+ *write = pos->remote_end;
+
+ if (pos->local_end <= local_flush)
+ {
+ *flush = pos->remote_end;
+ dlist_delete(iter.cur);
+ pfree(pos);
+ }
+ else
+ {
+ /*
+ * Don't want to uselessly iterate over the rest of the list which
+ * could potentially be long. Instead get the last element and
+ * grab the write position from there.
+ */
+ pos = dlist_tail_element(FlushPosition, node,
+ &lsn_mapping);
+ *write = pos->remote_end;
+ *have_pending_txes = true;
+ return;
+ }
+ }
+
+ *have_pending_txes = !dlist_is_empty(&lsn_mapping);
+}
+
+/*
+ * Store current remote/local lsn pair in the tracking list.
+ */
+static void
+store_flush_position(XLogRecPtr remote_lsn)
+{
+ FlushPosition *flushpos;
+
+ /* Need to do this in permanent context */
+ MemoryContextSwitchTo(ApplyCacheContext);
+
+ /* Track commit lsn */
+ flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
+ flushpos->local_end = XactLastCommitEnd;
+ flushpos->remote_end = remote_lsn;
+
+ dlist_push_tail(&lsn_mapping, &flushpos->node);
+ MemoryContextSwitchTo(ApplyContext);
+}
+
+
+/* Update statistics of the worker. */
+static void
+UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
+{
+ MyLogicalRepWorker->last_lsn = last_lsn;
+ MyLogicalRepWorker->last_send_time = send_time;
+ MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
+ if (reply)
+ {
+ MyLogicalRepWorker->reply_lsn = last_lsn;
+ MyLogicalRepWorker->reply_time = send_time;
+ }
+}
+
+/*
+ * Apply main loop.
+ */
+static void
+ApplyLoop(void)
+{
+ XLogRecPtr last_received = InvalidXLogRecPtr;
+
+ /* Init the ApplyContext which we use for easier cleanup. */
+ ApplyContext = AllocSetContextCreate(TopMemoryContext,
+ "ApplyContext",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* mark as idle, before starting to loop */
+ pgstat_report_activity(STATE_IDLE, NULL);
+
+ while (!got_SIGTERM)
+ {
+ pgsocket fd = PGINVALID_SOCKET;
+ int rc;
+ int len;
+ char *buf = NULL;
+ bool endofstream = false;
+ TimestampTz last_recv_timestamp = GetCurrentTimestamp();
+ bool ping_sent = false;
+
+ MemoryContextSwitchTo(ApplyContext);
+
+ len = walrcv_receive(wrconn, &buf, &fd);
+
+ if (len != 0)
+ {
+ /* Process the data */
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
+
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
+
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyContext);
+
+ s.data = buf;
+ s.len = len;
+ s.cursor = 0;
+ s.maxlen = -1;
+
+ c = pq_getmsgbyte(&s);
+
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
+
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time =
+ IntegerTimestampToTimestampTz(pq_getmsgint64(&s));
+
+ if (last_received < start_lsn)
+ last_received = start_lsn;
+
+ if (last_received < end_lsn)
+ last_received = end_lsn;
+
+ UpdateWorkerStats(last_received, send_time, false);
+
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr endpos;
+ TimestampTz timestamp;
+ bool reply_requested;
+
+ endpos = pq_getmsgint64(&s);
+ timestamp =
+ IntegerTimestampToTimestampTz(pq_getmsgint64(&s));
+ reply_requested = pq_getmsgbyte(&s);
+
+ send_feedback(endpos, reply_requested, false);
+ UpdateWorkerStats(last_received, timestamp, true);
+ }
+ /* other message types are purposefully ignored */
+ }
+
+ len = walrcv_receive(wrconn, &buf, &fd);
+ }
+ }
+
+ if (!in_remote_transaction)
+ {
+ /*
+ * If we didn't get any transactions for a while there might be
+ * unconsumed invalidation messages in the queue, consume them now.
+ */
+ StartTransactionCommand();
+ /* Check for subscription change */
+ if (!MySubscriptionValid)
+ reread_subscription();
+ CommitTransactionCommand();
+ }
+
+ /* confirm all writes at once */
+ send_feedback(last_received, false, false);
+
+ /* Cleanup the memory. */
+ MemoryContextResetAndDeleteChildren(ApplyContext);
+ MemoryContextSwitchTo(TopMemoryContext);
+
+ /* Check if we need to exit the streaming loop. */
+ if (endofstream)
+ break;
+
+ /*
+ * Wait for more data or latch.
+ */
+ rc = WaitLatchOrSocket(&MyProc->procLatch,
+ WL_SOCKET_READABLE | WL_LATCH_SET |
+ WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ fd, NAPTIME_PER_CYCLE,
+ WAIT_EVENT_LOGICAL_APPLY_MAIN);
+
+ /* Emergency bailout if postmaster has died */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ if (rc & WL_TIMEOUT)
+ {
+ /*
+ * We didn't receive anything new. If we haven't heard
+ * anything from the server for more than
+ * wal_receiver_timeout / 2, ping the server. Also, if
+ * it's been longer than wal_receiver_status_interval
+ * since the last update we sent, send a status update to
+ * the master anyway, to report any progress in applying
+ * WAL.
+ */
+ bool requestReply = false;
+
+ /*
+ * Check if time since last receive from standby has
+ * reached the configured limit.
+ */
+ if (wal_receiver_timeout > 0)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz timeout;
+
+ timeout =
+ TimestampTzPlusMilliseconds(last_recv_timestamp,
+ wal_receiver_timeout);
+
+ if (now >= timeout)
+ ereport(ERROR,
+ (errmsg("terminating logical replication worker due to timeout")));
+
+ /*
+ * We didn't receive anything new, for half of
+ * receiver replication timeout. Ping the server.
+ */
+ if (!ping_sent)
+ {
+ timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
+ (wal_receiver_timeout / 2));
+ if (now >= timeout)
+ {
+ requestReply = true;
+ ping_sent = true;
+ }
+ }
+ }
+
+ send_feedback(last_received, requestReply, requestReply);
+ }
+
+ ResetLatch(&MyProc->procLatch);
+ }
+}
+
+/*
+ * Send a Standby Status Update message to server.
+ *
+ * 'recvpos' is the latest LSN we've received data to, force is set if we need
+ * to send a response to avoid timeouts.
+ */
+static void
+send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
+{
+ static StringInfo reply_message = NULL;
+ static TimestampTz send_time = 0;
+
+ static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
+ static XLogRecPtr last_writepos = InvalidXLogRecPtr;
+ static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
+ TimestampTz now;
+ bool have_pending_txes;
+
+ /*
+ * If the user doesn't want status to be reported to the publisher, be
+ * sure to exit before doing anything at all.
+ */
+ if (!force && wal_receiver_status_interval <= 0)
+ return;
+
+ /* It's legal to not pass a recvpos */
+ if (recvpos < last_recvpos)
+ recvpos = last_recvpos;
+
+ get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+ /*
+ * No outstanding transactions to flush, we can report the latest
+ * received position. This is important for synchronous replication.
+ */
+ if (!have_pending_txes)
+ flushpos = writepos = recvpos;
+
+ if (writepos < last_writepos)
+ writepos = last_writepos;
+
+ if (flushpos < last_flushpos)
+ flushpos = last_flushpos;
+
+ now = GetCurrentTimestamp();
+
+ /* if we've already reported everything we're good */
+ if (!force &&
+ writepos == last_writepos &&
+ flushpos == last_flushpos &&
+ !TimestampDifferenceExceeds(send_time, now,
+ wal_receiver_status_interval * 1000))
+ return;
+ send_time = now;
+
+ if (!reply_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ reply_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(reply_message);
+
+ pq_sendbyte(reply_message, 'r');
+ pq_sendint64(reply_message, recvpos); /* write */
+ pq_sendint64(reply_message, flushpos); /* flush */
+ pq_sendint64(reply_message, writepos); /* apply */
+ pq_sendint64(reply_message, now); /* sendTime */
+ pq_sendbyte(reply_message, requestReply); /* replyRequested */
+
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ force,
+ (uint32) (recvpos >> 32), (uint32) recvpos,
+ (uint32) (writepos >> 32), (uint32) writepos,
+ (uint32) (flushpos >> 32), (uint32) flushpos
+ );
+
+ walrcv_send(wrconn, reply_message->data, reply_message->len);
+
+ if (recvpos > last_recvpos)
+ last_recvpos = recvpos;
+ if (writepos > last_writepos)
+ last_writepos = writepos;
+ if (flushpos > last_flushpos)
+ last_flushpos = flushpos;
+}
+
+
+/*
+ * Reread subscription info and exit on change.
+ */
+static void
+reread_subscription(void)
+{
+ MemoryContext oldctx;
+ Subscription *newsub;
+
+ /* Ensure allocations in permanent context. */
+ oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+
+ newsub = GetSubscription(MyLogicalRepWorker->subid, true);
+
+ /*
+ * Exit if connection string was changed. The launcher will start
+ * new worker.
+ */
+ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
+ {
+ ereport(LOG,
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because the connection information was changed",
+ MySubscription->name)));
+
+ walrcv_disconnect(wrconn);
+ proc_exit(0);
+ }
+
+ /*
+ * Exit if publication list was changed. The launcher will start
+ * new worker.
+ */
+ if (!equal(newsub->publications, MySubscription->publications))
+ {
+ ereport(LOG,
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because subscription's publications were changed",
+ MySubscription->name)));
+
+ walrcv_disconnect(wrconn);
+ proc_exit(0);
+ }
+
+ /*
+ * Exit if the subscription was removed.
+ * This normally should not happen as the worker gets killed
+ * during DROP SUBSCRIPTION.
+ */
+ if (!newsub)
+ {
+ ereport(LOG,
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
+
+ walrcv_disconnect(wrconn);
+ proc_exit(0);
+ }
+
+ /*
+ * Exit if the subscription was disabled.
+ * This normally should not happen as the worker gets killed
+ * during ALTER SUBSCRIPTION ... DISABLE.
+ */
+ if (!newsub->enabled)
+ {
+ ereport(LOG,
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
+
+ walrcv_disconnect(wrconn);
+ proc_exit(0);
+ }
+
+ /* Check for other changes that should never happen too. */
+ if (newsub->dbid != MySubscription->dbid ||
+ strcmp(newsub->name, MySubscription->name) != 0 ||
+ strcmp(newsub->slotname, MySubscription->slotname) != 0)
+ {
+ elog(ERROR, "subscription %u changed unexpectedly",
+ MyLogicalRepWorker->subid);
+ }
+
+ /* Clean old subscription info and switch to new one. */
+ FreeSubscription(MySubscription);
+ MySubscription = newsub;
+
+ MemoryContextSwitchTo(oldctx);
+
+ MySubscriptionValid = true;
+}
+
+/*
+ * Callback from subscription syscache invalidation.
+ */
+static void
+subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+ MySubscriptionValid = false;
+}
+
+
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetObjectId(main_arg);
+ MemoryContext oldctx;
+ char originname[NAMEDATALEN];
+ RepOriginId originid;
+ XLogRecPtr origin_startpos;
+ char *err;
+ int server_version;
+ TimeLineID startpointTLI;
+ WalRcvStreamOptions options;
+
+ /* Attach to slot */
+ logicalrep_worker_attach(worker_slot);
+
+ /* Setup signal handling */
+ pqsignal(SIGTERM, logicalrep_worker_sigterm);
+ BackgroundWorkerUnblockSignals();
+
+ /* Initialise stats to a sanish value */
+ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+ MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
+
+ /* Make it easy to identify our processes. */
+ SetConfigOption("application_name", MyBgworkerEntry->bgw_name,
+ PGC_USERSET, PGC_S_SESSION);
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ Assert(CurrentResourceOwner == NULL);
+ CurrentResourceOwner = ResourceOwnerCreate(NULL,
+ "logical replication apply");
+
+ /* Run as replica session replication role. */
+ SetConfigOption("session_replication_role", "replica",
+ PGC_SUSET, PGC_S_OVERRIDE);
+
+ /* Connect to our database. */
+ BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+ MyLogicalRepWorker->userid);
+
+ /* Load the subscription into persistent memory context. */
+ CreateCacheMemoryContext();
+ ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
+ "ApplyCacheContext",
+ ALLOCSET_DEFAULT_SIZES);
+ StartTransactionCommand();
+ oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+ MySubscriptionValid = true;
+ MemoryContextSwitchTo(oldctx);
+
+ if (!MySubscription->enabled)
+ {
+ ereport(LOG,
+ (errmsg("logical replication worker for subscription \"%s\" will not "
+ "start because the subscription was disabled during startup",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ /* Keep us informed about subscription changes. */
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
+ subscription_change_cb,
+ (Datum) 0);
+
+ ereport(LOG,
+ (errmsg("logical replication apply for subscription \"%s\" has started",
+ MySubscription->name)));
+
+ /* Setup replication origin tracking. */
+ snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ originid = replorigin_create(originname);
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
+ origin_startpos = replorigin_session_get_progress(false);
+
+ CommitTransactionCommand();
+
+ /* Connect to the origin and start the replication. */
+ elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
+ MySubscription->conninfo);
+ wrconn = walrcv_connect(MySubscription->conninfo, true,
+ MySubscription->name, &err);
+ if (wrconn == NULL)
+ ereport(ERROR,
+ (errmsg("could not connect to the publisher: %s", err)));
+
+ /*
+ * We don't really use the output identify_system for anything
+ * but it does some initializations on the upstream so let's still
+ * call it.
+ */
+ (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version);
+
+ /* Build logical replication streaming options. */
+ options.logical = true;
+ options.startpoint = origin_startpos;
+ options.slotname = MySubscription->slotname;
+ options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
+ options.proto.logical.publication_names = MySubscription->publications;
+
+ /* Start streaming from the slot. */
+ walrcv_startstreaming(wrconn, &options);
+
+ /* Run the main loop. */
+ ApplyLoop();
+
+ walrcv_disconnect(wrconn);
+
+ /* We should only get here if we received SIGTERM */
+ proc_exit(0);
+}