diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 1429 |
1 files changed, 1429 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c new file mode 100644 index 00000000000..7d86736444b --- /dev/null +++ b/src/backend/replication/logical/worker.c @@ -0,0 +1,1429 @@ +/*------------------------------------------------------------------------- + * worker.c + * PostgreSQL logical replication worker (apply) + * + * Copyright (c) 2012-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/worker.c + * + * NOTES + * This file contains the worker which applies logical changes as they come + * from remote logical replication stream. + * + * The main worker (apply) is started by logical replication worker + * launcher for every enabled subscription in a database. It uses + * walsender protocol to communicate with publisher. + * + * The apply worker may spawn additional workers (sync) for initial data + * synchronization of tables. + * + * This module includes server facing code and shares libpqwalreceiver + * module with walreceiver for providing the libpq specific functionality. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "pgstat.h" +#include "funcapi.h" + +#include "access/xact.h" +#include "access/xlog_internal.h" + +#include "catalog/namespace.h" +#include "catalog/pg_subscription.h" + +#include "commands/trigger.h" + +#include "executor/executor.h" +#include "executor/nodeModifyTable.h" + +#include "libpq/pqformat.h" +#include "libpq/pqsignal.h" + +#include "mb/pg_wchar.h" + +#include "nodes/makefuncs.h" + +#include "optimizer/planner.h" + +#include "parser/parse_relation.h" + +#include "postmaster/bgworker.h" +#include "postmaster/postmaster.h" + +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" +#include "replication/logicalworker.h" +#include "replication/reorderbuffer.h" +#include "replication/origin.h" +#include "replication/snapbuild.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" + +#include "rewrite/rewriteHandler.h" + +#include "storage/bufmgr.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" + +#include "utils/builtins.h" +#include "utils/catcache.h" +#include "utils/datum.h" +#include "utils/fmgroids.h" +#include "utils/guc.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/timeout.h" +#include "utils/tqual.h" +#include "utils/syscache.h" + +#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ + +typedef struct FlushPosition +{ + dlist_node node; + XLogRecPtr local_end; + XLogRecPtr remote_end; +} FlushPosition; + +static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); + +typedef struct SlotErrCallbackArg +{ + LogicalRepRelation *rel; + int attnum; +} SlotErrCallbackArg; + +static MemoryContext ApplyContext = NULL; +static MemoryContext ApplyCacheContext = NULL; + +WalReceiverConn *wrconn = NULL; + +Subscription *MySubscription = NULL; +bool MySubscriptionValid = false; + +bool in_remote_transaction = false; + +static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); + +static void store_flush_position(XLogRecPtr remote_lsn); + +static void reread_subscription(void); + +/* + * Make sure that we started local transaction. + * + * Also switches to ApplyContext as necessary. + */ +static bool +ensure_transaction(void) +{ + if (IsTransactionState()) + { + if (CurrentMemoryContext != ApplyContext) + MemoryContextSwitchTo(ApplyContext); + return false; + } + + StartTransactionCommand(); + + if (!MySubscriptionValid) + reread_subscription(); + + MemoryContextSwitchTo(ApplyContext); + return true; +} + + +/* + * Executor state preparation for evaluation of constraint expressions, + * indexes and triggers. + * + * This is based on similar code in copy.c + */ +static EState * +create_estate_for_relation(LogicalRepRelMapEntry *rel) +{ + EState *estate; + ResultRelInfo *resultRelInfo; + RangeTblEntry *rte; + + estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel->localrel); + rte->relkind = rel->localrel->rd_rel->relkind; + estate->es_range_table = list_make1(rte); + + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); + + estate->es_result_relations = resultRelInfo; + estate->es_num_result_relations = 1; + estate->es_result_relation_info = resultRelInfo; + + /* Triggers might need a slot */ + if (resultRelInfo->ri_TrigDesc) + estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + + return estate; +} + +/* + * Executes default values for columns for which we can't map to remote + * relation columns. + * + * This allows us to support tables which have more columns on the downstream + * than on the upstream. + */ +static void +slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, + TupleTableSlot *slot) +{ + TupleDesc desc = RelationGetDescr(rel->localrel); + int num_phys_attrs = desc->natts; + int i; + int attnum, + num_defaults = 0; + int *defmap; + ExprState **defexprs; + ExprContext *econtext; + + econtext = GetPerTupleExprContext(estate); + + /* We got all the data via replication, no need to evaluate anything. */ + if (num_phys_attrs == rel->remoterel.natts) + return; + + defmap = (int *) palloc(num_phys_attrs * sizeof(int)); + defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); + + for (attnum = 0; attnum < num_phys_attrs; attnum++) + { + Expr *defexpr; + + if (desc->attrs[attnum]->attisdropped) + continue; + + if (rel->attrmap[attnum] >= 0) + continue; + + defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1); + + if (defexpr != NULL) + { + /* Run the expression through planner */ + defexpr = expression_planner(defexpr); + + /* Initialize executable expression in copycontext */ + defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); + defmap[num_defaults] = attnum; + num_defaults++; + } + + } + + for (i = 0; i < num_defaults; i++) + slot->tts_values[defmap[i]] = + ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]); +} + +/* + * Error callback to give more context info about type conversion failure. + */ +static void +slot_store_error_callback(void *arg) +{ + SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; + Oid remotetypoid, + localtypoid; + + if (errarg->attnum < 0) + return; + + remotetypoid = errarg->rel->atttyps[errarg->attnum]; + localtypoid = logicalrep_typmap_getid(remotetypoid); + errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", " + "remote type %s, local type %s", + errarg->rel->nspname, errarg->rel->relname, + errarg->rel->attnames[errarg->attnum], + format_type_be(remotetypoid), + format_type_be(localtypoid)); +} + +/* + * Store data in C string form into slot. + * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our + * use better. + */ +static void +slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + char **values) +{ + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; + + ExecClearTuple(slot); + + /* Push callback + info on the error context stack */ + errarg.rel = &rel->remoterel; + errarg.attnum = -1; + errcallback.callback = slot_store_error_callback; + errcallback.arg = (void *) &errarg; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Call the "in" function for each non-dropped attribute */ + for (i = 0; i < natts; i++) + { + Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; + int remoteattnum = rel->attrmap[i]; + + if (!att->attisdropped && remoteattnum >= 0 && + values[remoteattnum] != NULL) + { + Oid typinput; + Oid typioparam; + + errarg.attnum = remoteattnum; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = OidInputFunctionCall(typinput, + values[remoteattnum], + typioparam, + att->atttypmod); + slot->tts_isnull[i] = false; + } + else + { + /* + * We assign NULL to dropped attributes, NULL values, and missing + * values (missing values should be later filled using + * slot_fill_defaults). + */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + } + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + ExecStoreVirtualTuple(slot); +} + +/* + * Modify slot with user data provided as C strigs. + * This is somewhat similar to heap_modify_tuple but also calls the type + * input fuction on the user data as the input is the text representation + * of the types. + */ +static void +slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + char **values, bool *replaces) +{ + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; + + slot_getallattrs(slot); + ExecClearTuple(slot); + + /* Push callback + info on the error context stack */ + errarg.rel = &rel->remoterel; + errarg.attnum = -1; + errcallback.callback = slot_store_error_callback; + errcallback.arg = (void *) &errarg; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Call the "in" function for each replaced attribute */ + for (i = 0; i < natts; i++) + { + Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; + int remoteattnum = rel->attrmap[i]; + + if (remoteattnum >= 0 && !replaces[remoteattnum]) + continue; + + if (remoteattnum >= 0 && values[remoteattnum] != NULL) + { + Oid typinput; + Oid typioparam; + + errarg.attnum = remoteattnum; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = OidInputFunctionCall(typinput, values[i], + typioparam, + att->atttypmod); + slot->tts_isnull[i] = false; + } + else + { + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + } + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + ExecStoreVirtualTuple(slot); +} + +/* + * Handle BEGIN message. + */ +static void +apply_handle_begin(StringInfo s) +{ + LogicalRepBeginData begin_data; + + logicalrep_read_begin(s, &begin_data); + + replorigin_session_origin_timestamp = begin_data.committime; + replorigin_session_origin_lsn = begin_data.final_lsn; + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle COMMIT message. + * + * TODO, support tracking of multiple origins + */ +static void +apply_handle_commit(StringInfo s) +{ + LogicalRepCommitData commit_data; + + logicalrep_read_commit(s, &commit_data); + + Assert(commit_data.commit_lsn == replorigin_session_origin_lsn); + Assert(commit_data.committime == replorigin_session_origin_timestamp); + + if (IsTransactionState()) + { + CommitTransactionCommand(); + + store_flush_position(commit_data.end_lsn); + } + + in_remote_transaction = false; + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle ORIGIN message. + * + * TODO, support tracking of multiple origins + */ +static void +apply_handle_origin(StringInfo s) +{ + /* + * ORIGIN message can only come inside remote transaction and before + * any actual writes. + */ + if (!in_remote_transaction || IsTransactionState()) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("ORIGIN message sent out of order"))); +} + +/* + * Handle RELATION message. + * + * Note we don't do validation against local schema here. The validation + * against local schema is postponed until first change for given relation + * comes as we only care about it when applying changes for it anyway and we + * do less locking this way. + */ +static void +apply_handle_relation(StringInfo s) +{ + LogicalRepRelation *rel; + + rel = logicalrep_read_rel(s); + logicalrep_relmap_update(rel); +} + +/* + * Handle TYPE message. + * + * Note we don't do local mapping here, that's done when the type is + * actually used. + */ +static void +apply_handle_type(StringInfo s) +{ + LogicalRepTyp typ; + + logicalrep_read_typ(s, &typ); + logicalrep_typmap_update(&typ); +} + +/* + * Get replica identity index or if it is not defined a primary key. + * + * If neither is defined, returns InvalidOid + */ +static Oid +GetRelationIdentityOrPK(Relation rel) +{ + Oid idxoid; + + idxoid = RelationGetReplicaIndex(rel); + + if (!OidIsValid(idxoid)) + idxoid = RelationGetPrimaryKeyIndex(rel); + + return idxoid; +} + +/* + * Handle INSERT message. + */ +static void +apply_handle_insert(StringInfo s) +{ + LogicalRepRelMapEntry *rel; + LogicalRepTupleData newtup; + LogicalRepRelId relid; + EState *estate; + TupleTableSlot *remoteslot; + MemoryContext oldctx; + + ensure_transaction(); + + relid = logicalrep_read_insert(s, &newtup); + rel = logicalrep_rel_open(relid, RowExclusiveLock); + + /* Initialize the executor state. */ + estate = create_estate_for_relation(rel); + remoteslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); + + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_cstrings(remoteslot, rel, newtup.values); + slot_fill_defaults(rel, estate, remoteslot); + MemoryContextSwitchTo(oldctx); + + PushActiveSnapshot(GetTransactionSnapshot()); + ExecOpenIndices(estate->es_result_relation_info, false); + + /* Do the insert. */ + ExecSimpleRelationInsert(estate, remoteslot); + + /* Cleanup. */ + ExecCloseIndices(estate->es_result_relation_info); + PopActiveSnapshot(); + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + + logicalrep_rel_close(rel, NoLock); + + CommandCounterIncrement(); +} + +/* + * Check if the logical replication relation is updatable and throw + * appropriate error if it isn't. + */ +static void +check_relation_updatable(LogicalRepRelMapEntry *rel) +{ + /* Updatable, no error. */ + if (rel->updatable) + return; + + /* + * We are in error mode so it's fine this is somewhat slow. + * It's better to give user correct error. + */ + if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publisher does not send replica identity column " + "expected by the logical replication target relation \"%s.%s\"", + rel->remoterel.nspname, rel->remoterel.relname))); + } + + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication target relation \"%s.%s\" has " + "neither REPLICA IDENTIY index nor PRIMARY " + "KEY and published relation does not have " + "REPLICA IDENTITY FULL", + rel->remoterel.nspname, rel->remoterel.relname))); +} + +/* + * Handle UPDATE message. + * + * TODO: FDW support + */ +static void +apply_handle_update(StringInfo s) +{ + LogicalRepRelMapEntry *rel; + LogicalRepRelId relid; + Oid idxoid; + EState *estate; + EPQState epqstate; + LogicalRepTupleData oldtup; + LogicalRepTupleData newtup; + bool has_oldtup; + TupleTableSlot *localslot; + TupleTableSlot *remoteslot; + bool found; + MemoryContext oldctx; + + ensure_transaction(); + + relid = logicalrep_read_update(s, &has_oldtup, &oldtup, + &newtup); + rel = logicalrep_rel_open(relid, RowExclusiveLock); + + /* Check if we can do the update. */ + check_relation_updatable(rel); + + /* Initialize the executor state. */ + estate = create_estate_for_relation(rel); + remoteslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); + localslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel)); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + PushActiveSnapshot(GetTransactionSnapshot()); + ExecOpenIndices(estate->es_result_relation_info, false); + + /* Build the search tuple. */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_cstrings(remoteslot, rel, + has_oldtup ? oldtup.values : newtup.values); + MemoryContextSwitchTo(oldctx); + + /* + * Try to find tuple using either replica identity index, primary key + * or if needed, sequential scan. + */ + idxoid = GetRelationIdentityOrPK(rel->localrel); + Assert(OidIsValid(idxoid) || + (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup)); + + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(rel->localrel, idxoid, + LockTupleExclusive, + remoteslot, localslot); + else + found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, + remoteslot, localslot); + + ExecClearTuple(remoteslot); + + /* + * Tuple found. + * + * Note this will fail if there are other conflicting unique indexes. + */ + if (found) + { + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + ExecStoreTuple(localslot->tts_tuple, remoteslot, InvalidBuffer, false); + slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed); + MemoryContextSwitchTo(oldctx); + + EvalPlanQualSetSlot(&epqstate, remoteslot); + + /* Do the actual update. */ + ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot); + } + else + { + /* + * The tuple to be updated could not be found. + * + * TODO what to do here, change the log level to LOG perhaps? + */ + elog(DEBUG1, + "logical replication did not find row for update " + "in replication target relation \"%s\"", + RelationGetRelationName(rel->localrel)); + } + + /* Cleanup. */ + ExecCloseIndices(estate->es_result_relation_info); + PopActiveSnapshot(); + EvalPlanQualEnd(&epqstate); + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + + logicalrep_rel_close(rel, NoLock); + + CommandCounterIncrement(); +} + +/* + * Handle DELETE message. + * + * TODO: FDW support + */ +static void +apply_handle_delete(StringInfo s) +{ + LogicalRepRelMapEntry *rel; + LogicalRepTupleData oldtup; + LogicalRepRelId relid; + Oid idxoid; + EState *estate; + EPQState epqstate; + TupleTableSlot *remoteslot; + TupleTableSlot *localslot; + bool found; + MemoryContext oldctx; + + ensure_transaction(); + + relid = logicalrep_read_delete(s, &oldtup); + rel = logicalrep_rel_open(relid, RowExclusiveLock); + + /* Check if we can do the delete. */ + check_relation_updatable(rel); + + /* Initialize the executor state. */ + estate = create_estate_for_relation(rel); + remoteslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->localrel)); + localslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(localslot, RelationGetDescr(rel->localrel)); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + + PushActiveSnapshot(GetTransactionSnapshot()); + ExecOpenIndices(estate->es_result_relation_info, false); + + /* Find the tuple using the replica identity index. */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_cstrings(remoteslot, rel, oldtup.values); + MemoryContextSwitchTo(oldctx); + + /* + * Try to find tuple using either replica identity index, primary key + * or if needed, sequential scan. + */ + idxoid = GetRelationIdentityOrPK(rel->localrel); + Assert(OidIsValid(idxoid) || + (rel->remoterel.replident == REPLICA_IDENTITY_FULL)); + + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(rel->localrel, idxoid, + LockTupleExclusive, + remoteslot, localslot); + else + found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive, + remoteslot, localslot); + /* If found delete it. */ + if (found) + { + EvalPlanQualSetSlot(&epqstate, localslot); + + /* Do the actual delete. */ + ExecSimpleRelationDelete(estate, &epqstate, localslot); + } + else + { + /* The tuple to be deleted could not be found.*/ + ereport(DEBUG1, + (errmsg("logical replication could not find row for delete " + "in replication target %s", + RelationGetRelationName(rel->localrel)))); + } + + /* Cleanup. */ + ExecCloseIndices(estate->es_result_relation_info); + PopActiveSnapshot(); + EvalPlanQualEnd(&epqstate); + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + + logicalrep_rel_close(rel, NoLock); + + CommandCounterIncrement(); +} + + +/* + * Logical replication protocol message dispatcher. + */ +static void +apply_dispatch(StringInfo s) +{ + char action = pq_getmsgbyte(s); + + switch (action) + { + /* BEGIN */ + case 'B': + apply_handle_begin(s); + break; + /* COMMIT */ + case 'C': + apply_handle_commit(s); + break; + /* INSERT */ + case 'I': + apply_handle_insert(s); + break; + /* UPDATE */ + case 'U': + apply_handle_update(s); + break; + /* DELETE */ + case 'D': + apply_handle_delete(s); + break; + /* RELATION */ + case 'R': + apply_handle_relation(s); + break; + /* TYPE */ + case 'Y': + apply_handle_type(s); + break; + /* ORIGIN */ + case 'O': + apply_handle_origin(s); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type %c", action))); + } +} + +/* + * Figure out which write/flush positions to report to the walsender process. + * + * We can't simply report back the last LSN the walsender sent us because the + * local transaction might not yet be flushed to disk locally. Instead we + * build a list that associates local with remote LSNs for every commit. When + * reporting back the flush position to the sender we iterate that list and + * check which entries on it are already locally flushed. Those we can report + * as having been flushed. + * + * The have_pending_txes is true if there are outstanding transactions that + * need to be flushed. + */ +static void +get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, + bool *have_pending_txes) +{ + dlist_mutable_iter iter; + XLogRecPtr local_flush = GetFlushRecPtr(); + + *write = InvalidXLogRecPtr; + *flush = InvalidXLogRecPtr; + + dlist_foreach_modify(iter, &lsn_mapping) + { + FlushPosition *pos = + dlist_container(FlushPosition, node, iter.cur); + + *write = pos->remote_end; + + if (pos->local_end <= local_flush) + { + *flush = pos->remote_end; + dlist_delete(iter.cur); + pfree(pos); + } + else + { + /* + * Don't want to uselessly iterate over the rest of the list which + * could potentially be long. Instead get the last element and + * grab the write position from there. + */ + pos = dlist_tail_element(FlushPosition, node, + &lsn_mapping); + *write = pos->remote_end; + *have_pending_txes = true; + return; + } + } + + *have_pending_txes = !dlist_is_empty(&lsn_mapping); +} + +/* + * Store current remote/local lsn pair in the tracking list. + */ +static void +store_flush_position(XLogRecPtr remote_lsn) +{ + FlushPosition *flushpos; + + /* Need to do this in permanent context */ + MemoryContextSwitchTo(ApplyCacheContext); + + /* Track commit lsn */ + flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); + flushpos->local_end = XactLastCommitEnd; + flushpos->remote_end = remote_lsn; + + dlist_push_tail(&lsn_mapping, &flushpos->node); + MemoryContextSwitchTo(ApplyContext); +} + + +/* Update statistics of the worker. */ +static void +UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) +{ + MyLogicalRepWorker->last_lsn = last_lsn; + MyLogicalRepWorker->last_send_time = send_time; + MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp(); + if (reply) + { + MyLogicalRepWorker->reply_lsn = last_lsn; + MyLogicalRepWorker->reply_time = send_time; + } +} + +/* + * Apply main loop. + */ +static void +ApplyLoop(void) +{ + XLogRecPtr last_received = InvalidXLogRecPtr; + + /* Init the ApplyContext which we use for easier cleanup. */ + ApplyContext = AllocSetContextCreate(TopMemoryContext, + "ApplyContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* mark as idle, before starting to loop */ + pgstat_report_activity(STATE_IDLE, NULL); + + while (!got_SIGTERM) + { + pgsocket fd = PGINVALID_SOCKET; + int rc; + int len; + char *buf = NULL; + bool endofstream = false; + TimestampTz last_recv_timestamp = GetCurrentTimestamp(); + bool ping_sent = false; + + MemoryContextSwitchTo(ApplyContext); + + len = walrcv_receive(wrconn, &buf, &fd); + + if (len != 0) + { + /* Process the data */ + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + if (len == 0) + { + break; + } + else if (len < 0) + { + ereport(LOG, + (errmsg("data stream from publisher has ended"))); + endofstream = true; + break; + } + else + { + int c; + StringInfoData s; + + /* Reset timeout. */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + + /* Ensure we are reading the data into our memory context. */ + MemoryContextSwitchTo(ApplyContext); + + s.data = buf; + s.len = len; + s.cursor = 0; + s.maxlen = -1; + + c = pq_getmsgbyte(&s); + + if (c == 'w') + { + XLogRecPtr start_lsn; + XLogRecPtr end_lsn; + TimestampTz send_time; + + start_lsn = pq_getmsgint64(&s); + end_lsn = pq_getmsgint64(&s); + send_time = + IntegerTimestampToTimestampTz(pq_getmsgint64(&s)); + + if (last_received < start_lsn) + last_received = start_lsn; + + if (last_received < end_lsn) + last_received = end_lsn; + + UpdateWorkerStats(last_received, send_time, false); + + apply_dispatch(&s); + } + else if (c == 'k') + { + XLogRecPtr endpos; + TimestampTz timestamp; + bool reply_requested; + + endpos = pq_getmsgint64(&s); + timestamp = + IntegerTimestampToTimestampTz(pq_getmsgint64(&s)); + reply_requested = pq_getmsgbyte(&s); + + send_feedback(endpos, reply_requested, false); + UpdateWorkerStats(last_received, timestamp, true); + } + /* other message types are purposefully ignored */ + } + + len = walrcv_receive(wrconn, &buf, &fd); + } + } + + if (!in_remote_transaction) + { + /* + * If we didn't get any transactions for a while there might be + * unconsumed invalidation messages in the queue, consume them now. + */ + StartTransactionCommand(); + /* Check for subscription change */ + if (!MySubscriptionValid) + reread_subscription(); + CommitTransactionCommand(); + } + + /* confirm all writes at once */ + send_feedback(last_received, false, false); + + /* Cleanup the memory. */ + MemoryContextResetAndDeleteChildren(ApplyContext); + MemoryContextSwitchTo(TopMemoryContext); + + /* Check if we need to exit the streaming loop. */ + if (endofstream) + break; + + /* + * Wait for more data or latch. + */ + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_SOCKET_READABLE | WL_LATCH_SET | + WL_TIMEOUT | WL_POSTMASTER_DEATH, + fd, NAPTIME_PER_CYCLE, + WAIT_EVENT_LOGICAL_APPLY_MAIN); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_TIMEOUT) + { + /* + * We didn't receive anything new. If we haven't heard + * anything from the server for more than + * wal_receiver_timeout / 2, ping the server. Also, if + * it's been longer than wal_receiver_status_interval + * since the last update we sent, send a status update to + * the master anyway, to report any progress in applying + * WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from standby has + * reached the configured limit. + */ + if (wal_receiver_timeout > 0) + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz timeout; + + timeout = + TimestampTzPlusMilliseconds(last_recv_timestamp, + wal_receiver_timeout); + + if (now >= timeout) + ereport(ERROR, + (errmsg("terminating logical replication worker due to timeout"))); + + /* + * We didn't receive anything new, for half of + * receiver replication timeout. Ping the server. + */ + if (!ping_sent) + { + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + (wal_receiver_timeout / 2)); + if (now >= timeout) + { + requestReply = true; + ping_sent = true; + } + } + } + + send_feedback(last_received, requestReply, requestReply); + } + + ResetLatch(&MyProc->procLatch); + } +} + +/* + * Send a Standby Status Update message to server. + * + * 'recvpos' is the latest LSN we've received data to, force is set if we need + * to send a response to avoid timeouts. + */ +static void +send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) +{ + static StringInfo reply_message = NULL; + static TimestampTz send_time = 0; + + static XLogRecPtr last_recvpos = InvalidXLogRecPtr; + static XLogRecPtr last_writepos = InvalidXLogRecPtr; + static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + + XLogRecPtr writepos; + XLogRecPtr flushpos; + TimestampTz now; + bool have_pending_txes; + + /* + * If the user doesn't want status to be reported to the publisher, be + * sure to exit before doing anything at all. + */ + if (!force && wal_receiver_status_interval <= 0) + return; + + /* It's legal to not pass a recvpos */ + if (recvpos < last_recvpos) + recvpos = last_recvpos; + + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + /* + * No outstanding transactions to flush, we can report the latest + * received position. This is important for synchronous replication. + */ + if (!have_pending_txes) + flushpos = writepos = recvpos; + + if (writepos < last_writepos) + writepos = last_writepos; + + if (flushpos < last_flushpos) + flushpos = last_flushpos; + + now = GetCurrentTimestamp(); + + /* if we've already reported everything we're good */ + if (!force && + writepos == last_writepos && + flushpos == last_flushpos && + !TimestampDifferenceExceeds(send_time, now, + wal_receiver_status_interval * 1000)) + return; + send_time = now; + + if (!reply_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext); + reply_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(reply_message); + + pq_sendbyte(reply_message, 'r'); + pq_sendint64(reply_message, recvpos); /* write */ + pq_sendint64(reply_message, flushpos); /* flush */ + pq_sendint64(reply_message, writepos); /* apply */ + pq_sendint64(reply_message, now); /* sendTime */ + pq_sendbyte(reply_message, requestReply); /* replyRequested */ + + elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", + force, + (uint32) (recvpos >> 32), (uint32) recvpos, + (uint32) (writepos >> 32), (uint32) writepos, + (uint32) (flushpos >> 32), (uint32) flushpos + ); + + walrcv_send(wrconn, reply_message->data, reply_message->len); + + if (recvpos > last_recvpos) + last_recvpos = recvpos; + if (writepos > last_writepos) + last_writepos = writepos; + if (flushpos > last_flushpos) + last_flushpos = flushpos; +} + + +/* + * Reread subscription info and exit on change. + */ +static void +reread_subscription(void) +{ + MemoryContext oldctx; + Subscription *newsub; + + /* Ensure allocations in permanent context. */ + oldctx = MemoryContextSwitchTo(ApplyCacheContext); + + newsub = GetSubscription(MyLogicalRepWorker->subid, true); + + /* + * Exit if connection string was changed. The launcher will start + * new worker. + */ + if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because the connection information was changed", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* + * Exit if publication list was changed. The launcher will start + * new worker. + */ + if (!equal(newsub->publications, MySubscription->publications)) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because subscription's publications were changed", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* + * Exit if the subscription was removed. + * This normally should not happen as the worker gets killed + * during DROP SUBSCRIPTION. + */ + if (!newsub) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was removed", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* + * Exit if the subscription was disabled. + * This normally should not happen as the worker gets killed + * during ALTER SUBSCRIPTION ... DISABLE. + */ + if (!newsub->enabled) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* Check for other changes that should never happen too. */ + if (newsub->dbid != MySubscription->dbid || + strcmp(newsub->name, MySubscription->name) != 0 || + strcmp(newsub->slotname, MySubscription->slotname) != 0) + { + elog(ERROR, "subscription %u changed unexpectedly", + MyLogicalRepWorker->subid); + } + + /* Clean old subscription info and switch to new one. */ + FreeSubscription(MySubscription); + MySubscription = newsub; + + MemoryContextSwitchTo(oldctx); + + MySubscriptionValid = true; +} + +/* + * Callback from subscription syscache invalidation. + */ +static void +subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + MySubscriptionValid = false; +} + + +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetObjectId(main_arg); + MemoryContext oldctx; + char originname[NAMEDATALEN]; + RepOriginId originid; + XLogRecPtr origin_startpos; + char *err; + int server_version; + TimeLineID startpointTLI; + WalRcvStreamOptions options; + + /* Attach to slot */ + logicalrep_worker_attach(worker_slot); + + /* Setup signal handling */ + pqsignal(SIGTERM, logicalrep_worker_sigterm); + BackgroundWorkerUnblockSignals(); + + /* Initialise stats to a sanish value */ + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = + MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); + + /* Make it easy to identify our processes. */ + SetConfigOption("application_name", MyBgworkerEntry->bgw_name, + PGC_USERSET, PGC_S_SESSION); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, + "logical replication apply"); + + /* Run as replica session replication role. */ + SetConfigOption("session_replication_role", "replica", + PGC_SUSET, PGC_S_OVERRIDE); + + /* Connect to our database. */ + BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->userid); + + /* Load the subscription into persistent memory context. */ + CreateCacheMemoryContext(); + ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext, + "ApplyCacheContext", + ALLOCSET_DEFAULT_SIZES); + StartTransactionCommand(); + oldctx = MemoryContextSwitchTo(ApplyCacheContext); + MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); + MySubscriptionValid = true; + MemoryContextSwitchTo(oldctx); + + if (!MySubscription->enabled) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will not " + "start because the subscription was disabled during startup", + MySubscription->name))); + + proc_exit(0); + } + + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, + subscription_change_cb, + (Datum) 0); + + ereport(LOG, + (errmsg("logical replication apply for subscription \"%s\" has started", + MySubscription->name))); + + /* Setup replication origin tracking. */ + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + + CommitTransactionCommand(); + + /* Connect to the origin and start the replication. */ + elog(DEBUG1, "connecting to publisher using connection string \"%s\"", + MySubscription->conninfo); + wrconn = walrcv_connect(MySubscription->conninfo, true, + MySubscription->name, &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything + * but it does some initializations on the upstream so let's still + * call it. + */ + (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version); + + /* Build logical replication streaming options. */ + options.logical = true; + options.startpoint = origin_startpos; + options.slotname = MySubscription->slotname; + options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; + options.proto.logical.publication_names = MySubscription->publications; + + /* Start streaming from the slot. */ + walrcv_startstreaming(wrconn, &options); + + /* Run the main loop. */ + ApplyLoop(); + + walrcv_disconnect(wrconn); + + /* We should only get here if we received SIGTERM */ + proc_exit(0); +} |