aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/worker.c79
1 files changed, 42 insertions, 37 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa3811783f6..673ebd211d1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -122,6 +122,10 @@ static void apply_handle_update_internal(ResultRelInfo *relinfo,
static void apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
TupleTableSlot *remoteslot,
LogicalRepRelation *remoterel);
+static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
+ LogicalRepRelation *remoterel,
+ TupleTableSlot *remoteslot,
+ TupleTableSlot **localslot);
/*
* Should this worker apply changes for given relation.
@@ -788,33 +792,17 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
LogicalRepRelMapEntry *relmapentry)
{
Relation localrel = relinfo->ri_RelationDesc;
- Oid idxoid;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
MemoryContext oldctx;
- localslot = table_slot_create(localrel, &estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
-
ExecOpenIndices(relinfo, false);
- /*
- * Try to find tuple using either replica identity index, primary key or
- * if needed, sequential scan.
- */
- idxoid = GetRelationIdentityOrPK(localrel);
- Assert(OidIsValid(idxoid) ||
- (relmapentry->remoterel.replident == REPLICA_IDENTITY_FULL));
-
- if (OidIsValid(idxoid))
- found = RelationFindReplTupleByIndex(localrel, idxoid,
- LockTupleExclusive,
- remoteslot, localslot);
- else
- found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
- remoteslot, localslot);
-
+ found = FindReplTupleInLocalRel(estate, localrel,
+ &relmapentry->remoterel,
+ remoteslot, &localslot);
ExecClearTuple(remoteslot);
/*
@@ -922,31 +910,15 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
LogicalRepRelation *remoterel)
{
Relation localrel = relinfo->ri_RelationDesc;
- Oid idxoid;
EPQState epqstate;
TupleTableSlot *localslot;
bool found;
- localslot = table_slot_create(localrel, &estate->es_tupleTable);
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
-
ExecOpenIndices(relinfo, false);
- /*
- * Try to find tuple using either replica identity index, primary key or
- * if needed, sequential scan.
- */
- idxoid = GetRelationIdentityOrPK(localrel);
- Assert(OidIsValid(idxoid) ||
- (remoterel->replident == REPLICA_IDENTITY_FULL));
-
- if (OidIsValid(idxoid))
- found = RelationFindReplTupleByIndex(localrel, idxoid,
- LockTupleExclusive,
- remoteslot, localslot);
- else
- found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
- remoteslot, localslot);
+ found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+ remoteslot, &localslot);
/* If found delete it. */
if (found)
@@ -971,6 +943,39 @@ apply_handle_delete_internal(ResultRelInfo *relinfo, EState *estate,
}
/*
+ * Try to find a tuple received from the publication side (in 'remoteslot') in
+ * the corresponding local relation using either replica identity index,
+ * primary key or if needed, sequential scan.
+ *
+ * Local tuple, if found, is returned in '*localslot'.
+ */
+static bool
+FindReplTupleInLocalRel(EState *estate, Relation localrel,
+ LogicalRepRelation *remoterel,
+ TupleTableSlot *remoteslot,
+ TupleTableSlot **localslot)
+{
+ Oid idxoid;
+ bool found;
+
+ *localslot = table_slot_create(localrel, &estate->es_tupleTable);
+
+ idxoid = GetRelationIdentityOrPK(localrel);
+ Assert(OidIsValid(idxoid) ||
+ (remoterel->replident == REPLICA_IDENTITY_FULL));
+
+ if (OidIsValid(idxoid))
+ found = RelationFindReplTupleByIndex(localrel, idxoid,
+ LockTupleExclusive,
+ remoteslot, *localslot);
+ else
+ found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+ remoteslot, *localslot);
+
+ return found;
+}
+
+/*
* Handle TRUNCATE message.
*
* TODO: FDW support