aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/README.parallel4
-rw-r--r--src/backend/access/transam/parallel.c19
-rw-r--r--src/backend/access/transam/xact.c4
-rw-r--r--src/backend/utils/cache/relmapper.c84
-rw-r--r--src/include/utils/relmapper.h6
5 files changed, 107 insertions, 10 deletions
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index f09a5806345..85e5840feba 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -125,6 +125,10 @@ worker. This includes:
- State related to pending REINDEX operations, which prevents access to
an index that is currently being rebuilt.
+ - Active relmapper.c mapping state. This is needed to allow consistent
+ answers when fetching the current relfilenode for relation oids of
+ mapped relations.
+
To prevent unprincipled deadlocks when running in parallel mode, this code
also arranges for the leader and all workers to participate in group
locking. See src/backend/storage/lmgr/README for more details.
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 30ddf94c952..c1681184670 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -37,6 +37,7 @@
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/memutils.h"
+#include "utils/relmapper.h"
#include "utils/snapmgr.h"
#include "utils/typcache.h"
@@ -69,6 +70,7 @@
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
+#define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
@@ -205,6 +207,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
Size asnaplen = 0;
Size tstatelen = 0;
Size reindexlen = 0;
+ Size relmapperlen = 0;
Size segsize = 0;
int i;
FixedParallelState *fps;
@@ -256,8 +259,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
reindexlen = EstimateReindexStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
+ relmapperlen = EstimateRelationMapSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 8);
+ shm_toc_estimate_keys(&pcxt->estimator, 9);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -327,6 +332,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *asnapspace;
char *tstatespace;
char *reindexspace;
+ char *relmapperspace;
char *error_queue_space;
char *session_dsm_handle_space;
char *entrypointstate;
@@ -373,6 +379,12 @@ InitializeParallelDSM(ParallelContext *pcxt)
SerializeReindexState(reindexlen, reindexspace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
+ /* Serialize relmapper state. */
+ relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
+ SerializeRelationMap(relmapperlen, relmapperspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
+ relmapperspace);
+
/* Allocate space for worker information. */
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
@@ -1205,6 +1217,7 @@ ParallelWorkerMain(Datum main_arg)
char *asnapspace;
char *tstatespace;
char *reindexspace;
+ char *relmapperspace;
StringInfoData msgbuf;
char *session_dsm_handle_space;
@@ -1380,6 +1393,10 @@ ParallelWorkerMain(Datum main_arg)
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
RestoreReindexState(reindexspace);
+ /* Restore relmapper state. */
+ relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
+ RestoreRelationMap(relmapperspace);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9aa63c8792b..cd8270d5fb0 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2018,7 +2018,7 @@ CommitTransaction(void)
HOLD_INTERRUPTS();
/* Commit updates to the relation map --- do this as late as possible */
- AtEOXact_RelationMap(true);
+ AtEOXact_RelationMap(true, is_parallel_worker);
/*
* set the current transaction state information appropriately during
@@ -2539,7 +2539,7 @@ AbortTransaction(void)
AtAbort_Portals();
AtEOXact_LargeObject(false);
AtAbort_Notify();
- AtEOXact_RelationMap(false);
+ AtEOXact_RelationMap(false, is_parallel_worker);
AtAbort_Twophase();
/*
diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c
index c7f0e6f6d4a..905867dc767 100644
--- a/src/backend/utils/cache/relmapper.c
+++ b/src/backend/utils/cache/relmapper.c
@@ -92,6 +92,16 @@ typedef struct RelMapFile
} RelMapFile;
/*
+ * State for serializing local and shared relmappings for parallel workers
+ * (active states only). See notes on active_* and pending_* updates state.
+ */
+typedef struct SerializedActiveRelMaps
+{
+ RelMapFile active_shared_updates;
+ RelMapFile active_local_updates;
+} SerializedActiveRelMaps;
+
+/*
* The currently known contents of the shared map file and our database's
* local map file are stored here. These can be reloaded from disk
* immediately whenever we receive an update sinval message.
@@ -111,6 +121,9 @@ static RelMapFile local_map;
* they will become active at the next CommandCounterIncrement. This setup
* lets map updates act similarly to updates of pg_class rows, ie, they
* become visible only at the next CommandCounterIncrement boundary.
+ *
+ * Active shared and active local updates are serialized by the parallel
+ * infrastructure, and deserialized within parallel workers.
*/
static RelMapFile active_shared_updates;
static RelMapFile active_local_updates;
@@ -263,13 +276,16 @@ RelationMapUpdateMap(Oid relationId, Oid fileNode, bool shared,
else
{
/*
- * We don't currently support map changes within subtransactions. This
- * could be done with more bookkeeping infrastructure, but it doesn't
- * presently seem worth it.
+ * We don't currently support map changes within subtransactions, or
+ * when in parallel mode. This could be done with more bookkeeping
+ * infrastructure, but it doesn't presently seem worth it.
*/
if (GetCurrentTransactionNestLevel() > 1)
elog(ERROR, "cannot change relation mapping within subtransaction");
+ if (IsInParallelMode())
+ elog(ERROR, "cannot change relation mapping in parallel mode");
+
if (immediate)
{
/* Make it active, but only locally */
@@ -452,11 +468,14 @@ AtCCI_RelationMap(void)
*
* During abort, we just have to throw away any pending map changes.
* Normal post-abort cleanup will take care of fixing relcache entries.
+ * Parallel worker commit/abort is handled by resetting active mappings
+ * that may have been received from the leader process. (There should be
+ * no pending updates in parallel workers.)
*/
void
-AtEOXact_RelationMap(bool isCommit)
+AtEOXact_RelationMap(bool isCommit, bool isParallelWorker)
{
- if (isCommit)
+ if (isCommit && !isParallelWorker)
{
/*
* We should not get here with any "pending" updates. (We could
@@ -482,7 +501,10 @@ AtEOXact_RelationMap(bool isCommit)
}
else
{
- /* Abort --- drop all local and pending updates */
+ /* Abort or parallel worker --- drop all local and pending updates */
+ Assert(!isParallelWorker || pending_shared_updates.num_mappings == 0);
+ Assert(!isParallelWorker || pending_local_updates.num_mappings == 0);
+
active_shared_updates.num_mappings = 0;
active_local_updates.num_mappings = 0;
pending_shared_updates.num_mappings = 0;
@@ -615,6 +637,56 @@ RelationMapInitializePhase3(void)
}
/*
+ * EstimateRelationMapSpace
+ *
+ * Estimate space needed to pass active shared and local relmaps to parallel
+ * workers.
+ */
+Size
+EstimateRelationMapSpace(void)
+{
+ return sizeof(SerializedActiveRelMaps);
+}
+
+/*
+ * SerializeRelationMap
+ *
+ * Serialize active shared and local relmap state for parallel workers.
+ */
+void
+SerializeRelationMap(Size maxSize, char *startAddress)
+{
+ SerializedActiveRelMaps *relmaps;
+
+ Assert(maxSize >= EstimateRelationMapSpace());
+
+ relmaps = (SerializedActiveRelMaps *) startAddress;
+ relmaps->active_shared_updates = active_shared_updates;
+ relmaps->active_local_updates = active_local_updates;
+}
+
+/*
+ * RestoreRelationMap
+ *
+ * Restore active shared and local relmap state within a parallel worker.
+ */
+void
+RestoreRelationMap(char *startAddress)
+{
+ SerializedActiveRelMaps *relmaps;
+
+ if (active_shared_updates.num_mappings != 0 ||
+ active_local_updates.num_mappings != 0 ||
+ pending_shared_updates.num_mappings != 0 ||
+ pending_local_updates.num_mappings != 0)
+ elog(ERROR, "parallel worker has existing mappings");
+
+ relmaps = (SerializedActiveRelMaps *) startAddress;
+ active_shared_updates = relmaps->active_shared_updates;
+ active_local_updates = relmaps->active_local_updates;
+}
+
+/*
* load_relmap_file -- load data from the shared or local map file
*
* Because the map file is essential for access to core system catalogs,
diff --git a/src/include/utils/relmapper.h b/src/include/utils/relmapper.h
index f69b1006bf2..fb51943498d 100644
--- a/src/include/utils/relmapper.h
+++ b/src/include/utils/relmapper.h
@@ -48,7 +48,7 @@ extern void RelationMapInvalidate(bool shared);
extern void RelationMapInvalidateAll(void);
extern void AtCCI_RelationMap(void);
-extern void AtEOXact_RelationMap(bool isCommit);
+extern void AtEOXact_RelationMap(bool isCommit, bool isParallelWorker);
extern void AtPrepare_RelationMap(void);
extern void CheckPointRelationMap(void);
@@ -59,6 +59,10 @@ extern void RelationMapInitialize(void);
extern void RelationMapInitializePhase2(void);
extern void RelationMapInitializePhase3(void);
+extern Size EstimateRelationMapSpace(void);
+extern void SerializeRelationMap(Size maxSize, char *startAddress);
+extern void RestoreRelationMap(char *startAddress);
+
extern void relmap_redo(XLogReaderState *record);
extern void relmap_desc(StringInfo buf, XLogReaderState *record);
extern const char *relmap_identify(uint8 info);