aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils')
-rw-r--r--src/backend/utils/cache/inval.c4
-rw-r--r--src/backend/utils/cache/relcache.c57
-rw-r--r--src/backend/utils/time/snapmgr.c102
-rw-r--r--src/backend/utils/time/tqual.c164
4 files changed, 314 insertions, 13 deletions
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 4423fe01bdd..115bcac5d23 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -512,7 +512,7 @@ RegisterSnapshotInvalidation(Oid dbId, Oid relId)
* Only the local caches are flushed; this does not transmit the message
* to other backends.
*/
-static void
+void
LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
{
if (msg->id >= 0)
@@ -596,7 +596,7 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
* since that tells us we've lost some shared-inval messages and hence
* don't know what needs to be invalidated.
*/
-static void
+void
InvalidateSystemCaches(void)
{
int i;
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2810b35eea1..32313244adb 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -73,6 +73,7 @@
#include "utils/memutils.h"
#include "utils/relmapper.h"
#include "utils/resowner_private.h"
+#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
@@ -235,7 +236,7 @@ static void formrdesc(const char *relationName, Oid relationReltype,
bool isshared, bool hasoids,
int natts, const FormData_pg_attribute *attrs);
-static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
+static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic);
static Relation AllocateRelationDesc(Form_pg_class relp);
static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
static void RelationBuildTupleDesc(Relation relation);
@@ -274,12 +275,13 @@ static void unlink_initfile(const char *initfilename);
* and must eventually be freed with heap_freetuple.
*/
static HeapTuple
-ScanPgRelation(Oid targetRelId, bool indexOK)
+ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic)
{
HeapTuple pg_class_tuple;
Relation pg_class_desc;
SysScanDesc pg_class_scan;
ScanKeyData key[1];
+ Snapshot snapshot;
/*
* If something goes wrong during backend startup, we might find ourselves
@@ -305,9 +307,20 @@ ScanPgRelation(Oid targetRelId, bool indexOK)
* scan by setting indexOK == false.
*/
pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
+
+ /*
+ * The caller might need a tuple that's newer than the one the historic
+ * snapshot; currently the only case requiring to do so is looking up the
+ * relfilenode of non mapped system relations during decoding.
+ */
+ if (force_non_historic)
+ snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId);
+ else
+ snapshot = GetCatalogSnapshot(RelationRelationId);
+
pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
indexOK && criticalRelcachesBuilt,
- NULL,
+ snapshot,
1, key);
pg_class_tuple = systable_getnext(pg_class_scan);
@@ -836,7 +849,7 @@ RelationBuildDesc(Oid targetRelId, bool insertIt)
/*
* find the tuple in pg_class corresponding to the given relation id
*/
- pg_class_tuple = ScanPgRelation(targetRelId, true);
+ pg_class_tuple = ScanPgRelation(targetRelId, true, false);
/*
* if no such tuple exists, return NULL
@@ -989,8 +1002,42 @@ RelationInitPhysicalAddr(Relation relation)
relation->rd_node.dbNode = InvalidOid;
else
relation->rd_node.dbNode = MyDatabaseId;
+
if (relation->rd_rel->relfilenode)
+ {
+ /*
+ * Even if we are using a decoding snapshot that doesn't represent
+ * the current state of the catalog we need to make sure the
+ * filenode points to the current file since the older file will
+ * be gone (or truncated). The new file will still contain older
+ * rows so lookups in them will work correctly. This wouldn't work
+ * correctly if rewrites were allowed to change the schema in a
+ * noncompatible way, but those are prevented both on catalog
+ * tables and on user tables declared as additional catalog
+ * tables.
+ */
+ if (HistoricSnapshotActive()
+ && RelationIsAccessibleInLogicalDecoding(relation)
+ && IsTransactionState())
+ {
+ HeapTuple phys_tuple;
+ Form_pg_class physrel;
+
+ phys_tuple = ScanPgRelation(RelationGetRelid(relation),
+ RelationGetRelid(relation) != ClassOidIndexId,
+ true);
+ if (!HeapTupleIsValid(phys_tuple))
+ elog(ERROR, "could not find pg_class entry for %u",
+ RelationGetRelid(relation));
+ physrel = (Form_pg_class) GETSTRUCT(phys_tuple);
+
+ relation->rd_rel->reltablespace = physrel->reltablespace;
+ relation->rd_rel->relfilenode = physrel->relfilenode;
+ heap_freetuple(phys_tuple);
+ }
+
relation->rd_node.relNode = relation->rd_rel->relfilenode;
+ }
else
{
/* Consult the relation mapper */
@@ -1742,7 +1789,7 @@ RelationReloadIndexInfo(Relation relation)
* for pg_class_oid_index ...
*/
indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
- pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
+ pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false);
if (!HeapTupleIsValid(pg_class_tuple))
elog(ERROR, "could not find pg_class tuple for index %u",
RelationGetRelid(relation));
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 4c0e0accc1c..4146527d2fd 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -19,6 +19,10 @@
* have regd_count = 1 and are counted in RegisteredSnapshots, but are not
* tracked by any resource owner.
*
+ * The same is true for historic snapshots used during logical decoding,
+ * their lifetime is managed separately (as they life longer as one xact.c
+ * transaction).
+ *
* These arrangements let us reset MyPgXact->xmin when there are no snapshots
* referenced by this transaction. (One possible improvement would be to be
* able to advance Xmin when the snapshot with the earliest Xmin is no longer
@@ -69,12 +73,13 @@
*/
static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC};
static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC};
-static SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
+SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC};
/* Pointers to valid snapshots */
static Snapshot CurrentSnapshot = NULL;
static Snapshot SecondarySnapshot = NULL;
static Snapshot CatalogSnapshot = NULL;
+static Snapshot HistoricSnapshot = NULL;
/*
* Staleness detection for CatalogSnapshot.
@@ -86,13 +91,18 @@ static bool CatalogSnapshotStale = true;
* for the convenience of TransactionIdIsInProgress: even in bootstrap
* mode, we don't want it to say that BootstrapTransactionId is in progress.
*
- * RecentGlobalXmin is initialized to InvalidTransactionId, to ensure that no
- * one tries to use a stale value. Readers should ensure that it has been set
- * to something else before using it.
+ * RecentGlobalXmin and RecentGlobalDataXmin are initialized to
+ * InvalidTransactionId, to ensure that no one tries to use a stale
+ * value. Readers should ensure that it has been set to something else
+ * before using it.
*/
TransactionId TransactionXmin = FirstNormalTransactionId;
TransactionId RecentXmin = FirstNormalTransactionId;
TransactionId RecentGlobalXmin = InvalidTransactionId;
+TransactionId RecentGlobalDataXmin = InvalidTransactionId;
+
+/* (table, ctid) => (cmin, cmax) mapping during timetravel */
+static HTAB *tuplecid_data = NULL;
/*
* Elements of the active snapshot stack.
@@ -158,6 +168,18 @@ static void SnapshotResetXmin(void);
Snapshot
GetTransactionSnapshot(void)
{
+ /*
+ * Return historic snapshot if doing logical decoding. We'll never
+ * need a non-historic transaction snapshot in this (sub-)transaction, so
+ * there's no need to be careful to set one up for later calls to
+ * GetTransactionSnapshot().
+ */
+ if (HistoricSnapshotActive())
+ {
+ Assert(!FirstSnapshotSet);
+ return HistoricSnapshot;
+ }
+
/* First call in transaction? */
if (!FirstSnapshotSet)
{
@@ -214,6 +236,13 @@ GetTransactionSnapshot(void)
Snapshot
GetLatestSnapshot(void)
{
+ /*
+ * So far there are no cases requiring support for GetLatestSnapshot()
+ * during logical decoding, but it wouldn't be hard to add if
+ * required.
+ */
+ Assert(!HistoricSnapshotActive());
+
/* If first call in transaction, go ahead and set the xact snapshot */
if (!FirstSnapshotSet)
return GetTransactionSnapshot();
@@ -232,6 +261,26 @@ Snapshot
GetCatalogSnapshot(Oid relid)
{
/*
+ * Return historic snapshot if we're doing logical decoding, but
+ * return a non-historic, snapshot if we temporarily are doing up2date
+ * lookups.
+ */
+ if (HistoricSnapshotActive())
+ return HistoricSnapshot;
+
+ return GetNonHistoricCatalogSnapshot(relid);
+}
+
+/*
+ * GetNonHistoricCatalogSnapshot
+ * Get a snapshot that is sufficiently up-to-date for scan of the system
+ * catalog with the specified OID, even while historic snapshots are set
+ * up.
+ */
+Snapshot
+GetNonHistoricCatalogSnapshot(Oid relid)
+{
+ /*
* If the caller is trying to scan a relation that has no syscache,
* no catcache invalidations will be sent when it is updated. For a
* a few key relations, snapshot invalidations are sent instead. If
@@ -303,6 +352,7 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
Assert(RegisteredSnapshots == 0);
Assert(FirstXactSnapshot == NULL);
+ Assert(HistoricSnapshotActive());
/*
* Even though we are not going to use the snapshot it computes, we must
@@ -796,7 +846,7 @@ AtEOXact_Snapshot(bool isCommit)
* Returns the token (the file name) that can be used to import this
* snapshot.
*/
-static char *
+char *
ExportSnapshot(Snapshot snapshot)
{
TransactionId topXid;
@@ -1258,3 +1308,45 @@ ThereAreNoPriorRegisteredSnapshots(void)
return false;
}
+
+/*
+ * Setup a snapshot that replaces normal catalog snapshots that allows catalog
+ * access to behave just like it did at a certain point in the past.
+ *
+ * Needed for logical decoding.
+ */
+void
+SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
+{
+ Assert(historic_snapshot != NULL);
+
+ /* setup the timetravel snapshot */
+ HistoricSnapshot = historic_snapshot;
+
+ /* setup (cmin, cmax) lookup hash */
+ tuplecid_data = tuplecids;
+}
+
+
+/*
+ * Make catalog snapshots behave normally again.
+ */
+void
+TeardownHistoricSnapshot(bool is_error)
+{
+ HistoricSnapshot = NULL;
+ tuplecid_data = NULL;
+}
+
+bool
+HistoricSnapshotActive(void)
+{
+ return HistoricSnapshot != NULL;
+}
+
+HTAB *
+HistoricSnapshotGetTupleCids(void)
+{
+ Assert(HistoricSnapshotActive());
+ return tuplecid_data;
+}
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index f6267552573..c4732ed3110 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -62,6 +62,9 @@
#include "access/xact.h"
#include "storage/bufmgr.h"
#include "storage/procarray.h"
+#include "utils/builtins.h"
+#include "utils/combocid.h"
+#include "utils/snapmgr.h"
#include "utils/tqual.h"
@@ -73,7 +76,6 @@ SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
/* local functions */
static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
-
/*
* SetHintBits()
*
@@ -1545,3 +1547,163 @@ HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
*/
return true;
}
+
+/*
+ * check whether the transaciont id 'xid' in in the pre-sorted array 'xip'.
+ */
+static bool
+TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
+{
+ return bsearch(&xid, xip, num,
+ sizeof(TransactionId), xidComparator) != NULL;
+}
+
+/*
+ * See the comments for HeapTupleSatisfiesMVCC for the semantics this function
+ * obeys.
+ *
+ * Only usable on tuples from catalog tables!
+ *
+ * We don't need to support HEAP_MOVED_(IN|OFF) for now because we only support
+ * reading catalog pages which couldn't have been created in an older version.
+ *
+ * We don't set any hint bits in here as it seems unlikely to be beneficial as
+ * those should already be set by normal access and it seems to be too
+ * dangerous to do so as the semantics of doing so during timetravel are more
+ * complicated than when dealing "only" with the present.
+ */
+bool
+HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
+ Buffer buffer)
+{
+ HeapTupleHeader tuple = htup->t_data;
+ TransactionId xmin = HeapTupleHeaderGetXmin(tuple);
+ TransactionId xmax = HeapTupleHeaderGetRawXmax(tuple);
+
+ Assert(ItemPointerIsValid(&htup->t_self));
+ Assert(htup->t_tableOid != InvalidOid);
+
+ /* inserting transaction aborted */
+ if (HeapTupleHeaderXminInvalid(tuple))
+ {
+ Assert(!TransactionIdDidCommit(xmin));
+ return false;
+ }
+ /* check if its one of our txids, toplevel is also in there */
+ else if (TransactionIdInArray(xmin, snapshot->subxip, snapshot->subxcnt))
+ {
+ bool resolved;
+ CommandId cmin = HeapTupleHeaderGetRawCommandId(tuple);
+ CommandId cmax = InvalidCommandId;
+
+ /*
+ * another transaction might have (tried to) delete this tuple or
+ * cmin/cmax was stored in a combocid. S we need to to lookup the
+ * actual values externally.
+ */
+ resolved = ResolveCminCmaxDuringDecoding(HistoricSnapshotGetTupleCids(), snapshot,
+ htup, buffer,
+ &cmin, &cmax);
+
+ if (!resolved)
+ elog(ERROR, "could not resolve cmin/cmax of catalog tuple");
+
+ Assert(cmin != InvalidCommandId);
+
+ if (cmin >= snapshot->curcid)
+ return false; /* inserted after scan started */
+ /* fall through */
+ }
+ /* committed before our xmin horizon. Do a normal visibility check. */
+ else if (TransactionIdPrecedes(xmin, snapshot->xmin))
+ {
+ Assert(!(HeapTupleHeaderXminCommitted(tuple) &&
+ !TransactionIdDidCommit(xmin)));
+
+ /* check for hint bit first, consult clog afterwards */
+ if (!HeapTupleHeaderXminCommitted(tuple) &&
+ !TransactionIdDidCommit(xmin))
+ return false;
+ /* fall through */
+ }
+ /* beyond our xmax horizon, i.e. invisible */
+ else if (TransactionIdFollowsOrEquals(xmin, snapshot->xmax))
+ {
+ return false;
+ }
+ /* check if it's a committed transaction in [xmin, xmax) */
+ else if(TransactionIdInArray(xmin, snapshot->xip, snapshot->xcnt))
+ {
+ /* fall through */
+ }
+ /*
+ * none of the above, i.e. between [xmin, xmax) but hasn't
+ * committed. I.e. invisible.
+ */
+ else
+ {
+ return false;
+ }
+
+ /* at this point we know xmin is visible, go on to check xmax */
+
+ /* xid invalid or aborted */
+ if (tuple->t_infomask & HEAP_XMAX_INVALID)
+ return true;
+ /* locked tuples are always visible */
+ else if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask))
+ return true;
+ /*
+ * We can see multis here if we're looking at user tables or if
+ * somebody SELECT ... FOR SHARE/UPDATE a system table.
+ */
+ else if (tuple->t_infomask & HEAP_XMAX_IS_MULTI)
+ {
+ xmax = HeapTupleGetUpdateXid(tuple);
+ }
+
+ /* check if its one of our txids, toplevel is also in there */
+ if (TransactionIdInArray(xmax, snapshot->subxip, snapshot->subxcnt))
+ {
+ bool resolved;
+ CommandId cmin;
+ CommandId cmax = HeapTupleHeaderGetRawCommandId(tuple);
+
+ /* Lookup actual cmin/cmax values */
+ resolved = ResolveCminCmaxDuringDecoding(HistoricSnapshotGetTupleCids(), snapshot,
+ htup, buffer,
+ &cmin, &cmax);
+
+ if (!resolved)
+ elog(ERROR, "could not resolve combocid to cmax");
+
+ Assert(cmax != InvalidCommandId);
+
+ if (cmax >= snapshot->curcid)
+ return true; /* deleted after scan started */
+ else
+ return false; /* deleted before scan started */
+ }
+ /* below xmin horizon, normal transaction state is valid */
+ else if (TransactionIdPrecedes(xmax, snapshot->xmin))
+ {
+ Assert(!(tuple->t_infomask & HEAP_XMAX_COMMITTED &&
+ !TransactionIdDidCommit(xmax)));
+
+ /* check hint bit first */
+ if (tuple->t_infomask & HEAP_XMAX_COMMITTED)
+ return false;
+
+ /* check clog */
+ return !TransactionIdDidCommit(xmax);
+ }
+ /* above xmax horizon, we cannot possibly see the deleting transaction */
+ else if (TransactionIdFollowsOrEquals(xmax, snapshot->xmax))
+ return true;
+ /* xmax is between [xmin, xmax), check known committed array */
+ else if (TransactionIdInArray(xmax, snapshot->xip, snapshot->xcnt))
+ return false;
+ /* xmax is between [xmin, xmax), but known not to have committed yet */
+ else
+ return true;
+}