diff options
Diffstat (limited to 'src/backend/utils')
-rw-r--r-- | src/backend/utils/cache/inval.c | 4 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 57 | ||||
-rw-r--r-- | src/backend/utils/time/snapmgr.c | 102 | ||||
-rw-r--r-- | src/backend/utils/time/tqual.c | 164 |
4 files changed, 314 insertions, 13 deletions
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 4423fe01bdd..115bcac5d23 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -512,7 +512,7 @@ RegisterSnapshotInvalidation(Oid dbId, Oid relId) * Only the local caches are flushed; this does not transmit the message * to other backends. */ -static void +void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) { if (msg->id >= 0) @@ -596,7 +596,7 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) * since that tells us we've lost some shared-inval messages and hence * don't know what needs to be invalidated. */ -static void +void InvalidateSystemCaches(void) { int i; diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2810b35eea1..32313244adb 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -73,6 +73,7 @@ #include "utils/memutils.h" #include "utils/relmapper.h" #include "utils/resowner_private.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -235,7 +236,7 @@ static void formrdesc(const char *relationName, Oid relationReltype, bool isshared, bool hasoids, int natts, const FormData_pg_attribute *attrs); -static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK); +static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic); static Relation AllocateRelationDesc(Form_pg_class relp); static void RelationParseRelOptions(Relation relation, HeapTuple tuple); static void RelationBuildTupleDesc(Relation relation); @@ -274,12 +275,13 @@ static void unlink_initfile(const char *initfilename); * and must eventually be freed with heap_freetuple. */ static HeapTuple -ScanPgRelation(Oid targetRelId, bool indexOK) +ScanPgRelation(Oid targetRelId, bool indexOK, bool force_non_historic) { HeapTuple pg_class_tuple; Relation pg_class_desc; SysScanDesc pg_class_scan; ScanKeyData key[1]; + Snapshot snapshot; /* * If something goes wrong during backend startup, we might find ourselves @@ -305,9 +307,20 @@ ScanPgRelation(Oid targetRelId, bool indexOK) * scan by setting indexOK == false. */ pg_class_desc = heap_open(RelationRelationId, AccessShareLock); + + /* + * The caller might need a tuple that's newer than the one the historic + * snapshot; currently the only case requiring to do so is looking up the + * relfilenode of non mapped system relations during decoding. + */ + if (force_non_historic) + snapshot = GetNonHistoricCatalogSnapshot(RelationRelationId); + else + snapshot = GetCatalogSnapshot(RelationRelationId); + pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId, indexOK && criticalRelcachesBuilt, - NULL, + snapshot, 1, key); pg_class_tuple = systable_getnext(pg_class_scan); @@ -836,7 +849,7 @@ RelationBuildDesc(Oid targetRelId, bool insertIt) /* * find the tuple in pg_class corresponding to the given relation id */ - pg_class_tuple = ScanPgRelation(targetRelId, true); + pg_class_tuple = ScanPgRelation(targetRelId, true, false); /* * if no such tuple exists, return NULL @@ -989,8 +1002,42 @@ RelationInitPhysicalAddr(Relation relation) relation->rd_node.dbNode = InvalidOid; else relation->rd_node.dbNode = MyDatabaseId; + if (relation->rd_rel->relfilenode) + { + /* + * Even if we are using a decoding snapshot that doesn't represent + * the current state of the catalog we need to make sure the + * filenode points to the current file since the older file will + * be gone (or truncated). The new file will still contain older + * rows so lookups in them will work correctly. This wouldn't work + * correctly if rewrites were allowed to change the schema in a + * noncompatible way, but those are prevented both on catalog + * tables and on user tables declared as additional catalog + * tables. + */ + if (HistoricSnapshotActive() + && RelationIsAccessibleInLogicalDecoding(relation) + && IsTransactionState()) + { + HeapTuple phys_tuple; + Form_pg_class physrel; + + phys_tuple = ScanPgRelation(RelationGetRelid(relation), + RelationGetRelid(relation) != ClassOidIndexId, + true); + if (!HeapTupleIsValid(phys_tuple)) + elog(ERROR, "could not find pg_class entry for %u", + RelationGetRelid(relation)); + physrel = (Form_pg_class) GETSTRUCT(phys_tuple); + + relation->rd_rel->reltablespace = physrel->reltablespace; + relation->rd_rel->relfilenode = physrel->relfilenode; + heap_freetuple(phys_tuple); + } + relation->rd_node.relNode = relation->rd_rel->relfilenode; + } else { /* Consult the relation mapper */ @@ -1742,7 +1789,7 @@ RelationReloadIndexInfo(Relation relation) * for pg_class_oid_index ... */ indexOK = (RelationGetRelid(relation) != ClassOidIndexId); - pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK); + pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK, false); if (!HeapTupleIsValid(pg_class_tuple)) elog(ERROR, "could not find pg_class tuple for index %u", RelationGetRelid(relation)); diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 4c0e0accc1c..4146527d2fd 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -19,6 +19,10 @@ * have regd_count = 1 and are counted in RegisteredSnapshots, but are not * tracked by any resource owner. * + * The same is true for historic snapshots used during logical decoding, + * their lifetime is managed separately (as they life longer as one xact.c + * transaction). + * * These arrangements let us reset MyPgXact->xmin when there are no snapshots * referenced by this transaction. (One possible improvement would be to be * able to advance Xmin when the snapshot with the earliest Xmin is no longer @@ -69,12 +73,13 @@ */ static SnapshotData CurrentSnapshotData = {HeapTupleSatisfiesMVCC}; static SnapshotData SecondarySnapshotData = {HeapTupleSatisfiesMVCC}; -static SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC}; +SnapshotData CatalogSnapshotData = {HeapTupleSatisfiesMVCC}; /* Pointers to valid snapshots */ static Snapshot CurrentSnapshot = NULL; static Snapshot SecondarySnapshot = NULL; static Snapshot CatalogSnapshot = NULL; +static Snapshot HistoricSnapshot = NULL; /* * Staleness detection for CatalogSnapshot. @@ -86,13 +91,18 @@ static bool CatalogSnapshotStale = true; * for the convenience of TransactionIdIsInProgress: even in bootstrap * mode, we don't want it to say that BootstrapTransactionId is in progress. * - * RecentGlobalXmin is initialized to InvalidTransactionId, to ensure that no - * one tries to use a stale value. Readers should ensure that it has been set - * to something else before using it. + * RecentGlobalXmin and RecentGlobalDataXmin are initialized to + * InvalidTransactionId, to ensure that no one tries to use a stale + * value. Readers should ensure that it has been set to something else + * before using it. */ TransactionId TransactionXmin = FirstNormalTransactionId; TransactionId RecentXmin = FirstNormalTransactionId; TransactionId RecentGlobalXmin = InvalidTransactionId; +TransactionId RecentGlobalDataXmin = InvalidTransactionId; + +/* (table, ctid) => (cmin, cmax) mapping during timetravel */ +static HTAB *tuplecid_data = NULL; /* * Elements of the active snapshot stack. @@ -158,6 +168,18 @@ static void SnapshotResetXmin(void); Snapshot GetTransactionSnapshot(void) { + /* + * Return historic snapshot if doing logical decoding. We'll never + * need a non-historic transaction snapshot in this (sub-)transaction, so + * there's no need to be careful to set one up for later calls to + * GetTransactionSnapshot(). + */ + if (HistoricSnapshotActive()) + { + Assert(!FirstSnapshotSet); + return HistoricSnapshot; + } + /* First call in transaction? */ if (!FirstSnapshotSet) { @@ -214,6 +236,13 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { + /* + * So far there are no cases requiring support for GetLatestSnapshot() + * during logical decoding, but it wouldn't be hard to add if + * required. + */ + Assert(!HistoricSnapshotActive()); + /* If first call in transaction, go ahead and set the xact snapshot */ if (!FirstSnapshotSet) return GetTransactionSnapshot(); @@ -232,6 +261,26 @@ Snapshot GetCatalogSnapshot(Oid relid) { /* + * Return historic snapshot if we're doing logical decoding, but + * return a non-historic, snapshot if we temporarily are doing up2date + * lookups. + */ + if (HistoricSnapshotActive()) + return HistoricSnapshot; + + return GetNonHistoricCatalogSnapshot(relid); +} + +/* + * GetNonHistoricCatalogSnapshot + * Get a snapshot that is sufficiently up-to-date for scan of the system + * catalog with the specified OID, even while historic snapshots are set + * up. + */ +Snapshot +GetNonHistoricCatalogSnapshot(Oid relid) +{ + /* * If the caller is trying to scan a relation that has no syscache, * no catcache invalidations will be sent when it is updated. For a * a few key relations, snapshot invalidations are sent instead. If @@ -303,6 +352,7 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) Assert(RegisteredSnapshots == 0); Assert(FirstXactSnapshot == NULL); + Assert(HistoricSnapshotActive()); /* * Even though we are not going to use the snapshot it computes, we must @@ -796,7 +846,7 @@ AtEOXact_Snapshot(bool isCommit) * Returns the token (the file name) that can be used to import this * snapshot. */ -static char * +char * ExportSnapshot(Snapshot snapshot) { TransactionId topXid; @@ -1258,3 +1308,45 @@ ThereAreNoPriorRegisteredSnapshots(void) return false; } + +/* + * Setup a snapshot that replaces normal catalog snapshots that allows catalog + * access to behave just like it did at a certain point in the past. + * + * Needed for logical decoding. + */ +void +SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids) +{ + Assert(historic_snapshot != NULL); + + /* setup the timetravel snapshot */ + HistoricSnapshot = historic_snapshot; + + /* setup (cmin, cmax) lookup hash */ + tuplecid_data = tuplecids; +} + + +/* + * Make catalog snapshots behave normally again. + */ +void +TeardownHistoricSnapshot(bool is_error) +{ + HistoricSnapshot = NULL; + tuplecid_data = NULL; +} + +bool +HistoricSnapshotActive(void) +{ + return HistoricSnapshot != NULL; +} + +HTAB * +HistoricSnapshotGetTupleCids(void) +{ + Assert(HistoricSnapshotActive()); + return tuplecid_data; +} diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index f6267552573..c4732ed3110 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -62,6 +62,9 @@ #include "access/xact.h" #include "storage/bufmgr.h" #include "storage/procarray.h" +#include "utils/builtins.h" +#include "utils/combocid.h" +#include "utils/snapmgr.h" #include "utils/tqual.h" @@ -73,7 +76,6 @@ SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast}; /* local functions */ static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); - /* * SetHintBits() * @@ -1545,3 +1547,163 @@ HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple) */ return true; } + +/* + * check whether the transaciont id 'xid' in in the pre-sorted array 'xip'. + */ +static bool +TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num) +{ + return bsearch(&xid, xip, num, + sizeof(TransactionId), xidComparator) != NULL; +} + +/* + * See the comments for HeapTupleSatisfiesMVCC for the semantics this function + * obeys. + * + * Only usable on tuples from catalog tables! + * + * We don't need to support HEAP_MOVED_(IN|OFF) for now because we only support + * reading catalog pages which couldn't have been created in an older version. + * + * We don't set any hint bits in here as it seems unlikely to be beneficial as + * those should already be set by normal access and it seems to be too + * dangerous to do so as the semantics of doing so during timetravel are more + * complicated than when dealing "only" with the present. + */ +bool +HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot, + Buffer buffer) +{ + HeapTupleHeader tuple = htup->t_data; + TransactionId xmin = HeapTupleHeaderGetXmin(tuple); + TransactionId xmax = HeapTupleHeaderGetRawXmax(tuple); + + Assert(ItemPointerIsValid(&htup->t_self)); + Assert(htup->t_tableOid != InvalidOid); + + /* inserting transaction aborted */ + if (HeapTupleHeaderXminInvalid(tuple)) + { + Assert(!TransactionIdDidCommit(xmin)); + return false; + } + /* check if its one of our txids, toplevel is also in there */ + else if (TransactionIdInArray(xmin, snapshot->subxip, snapshot->subxcnt)) + { + bool resolved; + CommandId cmin = HeapTupleHeaderGetRawCommandId(tuple); + CommandId cmax = InvalidCommandId; + + /* + * another transaction might have (tried to) delete this tuple or + * cmin/cmax was stored in a combocid. S we need to to lookup the + * actual values externally. + */ + resolved = ResolveCminCmaxDuringDecoding(HistoricSnapshotGetTupleCids(), snapshot, + htup, buffer, + &cmin, &cmax); + + if (!resolved) + elog(ERROR, "could not resolve cmin/cmax of catalog tuple"); + + Assert(cmin != InvalidCommandId); + + if (cmin >= snapshot->curcid) + return false; /* inserted after scan started */ + /* fall through */ + } + /* committed before our xmin horizon. Do a normal visibility check. */ + else if (TransactionIdPrecedes(xmin, snapshot->xmin)) + { + Assert(!(HeapTupleHeaderXminCommitted(tuple) && + !TransactionIdDidCommit(xmin))); + + /* check for hint bit first, consult clog afterwards */ + if (!HeapTupleHeaderXminCommitted(tuple) && + !TransactionIdDidCommit(xmin)) + return false; + /* fall through */ + } + /* beyond our xmax horizon, i.e. invisible */ + else if (TransactionIdFollowsOrEquals(xmin, snapshot->xmax)) + { + return false; + } + /* check if it's a committed transaction in [xmin, xmax) */ + else if(TransactionIdInArray(xmin, snapshot->xip, snapshot->xcnt)) + { + /* fall through */ + } + /* + * none of the above, i.e. between [xmin, xmax) but hasn't + * committed. I.e. invisible. + */ + else + { + return false; + } + + /* at this point we know xmin is visible, go on to check xmax */ + + /* xid invalid or aborted */ + if (tuple->t_infomask & HEAP_XMAX_INVALID) + return true; + /* locked tuples are always visible */ + else if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask)) + return true; + /* + * We can see multis here if we're looking at user tables or if + * somebody SELECT ... FOR SHARE/UPDATE a system table. + */ + else if (tuple->t_infomask & HEAP_XMAX_IS_MULTI) + { + xmax = HeapTupleGetUpdateXid(tuple); + } + + /* check if its one of our txids, toplevel is also in there */ + if (TransactionIdInArray(xmax, snapshot->subxip, snapshot->subxcnt)) + { + bool resolved; + CommandId cmin; + CommandId cmax = HeapTupleHeaderGetRawCommandId(tuple); + + /* Lookup actual cmin/cmax values */ + resolved = ResolveCminCmaxDuringDecoding(HistoricSnapshotGetTupleCids(), snapshot, + htup, buffer, + &cmin, &cmax); + + if (!resolved) + elog(ERROR, "could not resolve combocid to cmax"); + + Assert(cmax != InvalidCommandId); + + if (cmax >= snapshot->curcid) + return true; /* deleted after scan started */ + else + return false; /* deleted before scan started */ + } + /* below xmin horizon, normal transaction state is valid */ + else if (TransactionIdPrecedes(xmax, snapshot->xmin)) + { + Assert(!(tuple->t_infomask & HEAP_XMAX_COMMITTED && + !TransactionIdDidCommit(xmax))); + + /* check hint bit first */ + if (tuple->t_infomask & HEAP_XMAX_COMMITTED) + return false; + + /* check clog */ + return !TransactionIdDidCommit(xmax); + } + /* above xmax horizon, we cannot possibly see the deleting transaction */ + else if (TransactionIdFollowsOrEquals(xmax, snapshot->xmax)) + return true; + /* xmax is between [xmin, xmax), check known committed array */ + else if (TransactionIdInArray(xmax, snapshot->xip, snapshot->xcnt)) + return false; + /* xmax is between [xmin, xmax), but known not to have committed yet */ + else + return true; +} |