aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/README.parallel3
-rw-r--r--src/backend/access/transam/parallel.c18
-rw-r--r--src/backend/catalog/index.c75
-rw-r--r--src/include/catalog/index.h4
4 files changed, 98 insertions, 2 deletions
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index 5c33c40ae95..32994719e3b 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -122,6 +122,9 @@ worker. This includes:
values are restored, this incidentally sets SessionUserId and OuterUserId
to the correct values. This final step restores CurrentUserId.
+ - State related to pending REINDEX operations, which prevents access to
+ an index that is currently being rebuilt.
+
To prevent undetected or unprincipled deadlocks when running in parallel mode,
this could should eventually handle heavyweight locks in some way. This is
not implemented yet.
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index f720896e507..0a0157a8781 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -18,6 +18,7 @@
#include "access/session.h"
#include "access/xact.h"
#include "access/xlog.h"
+#include "catalog/index.h"
#include "catalog/namespace.h"
#include "commands/async.h"
#include "executor/execParallel.h"
@@ -67,6 +68,7 @@
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
+#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
/* Fixed-size parallel state. */
typedef struct FixedParallelState
@@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
Size tsnaplen = 0;
Size asnaplen = 0;
Size tstatelen = 0;
+ Size reindexlen = 0;
Size segsize = 0;
int i;
FixedParallelState *fps;
@@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
tstatelen = EstimateTransactionStateSpace();
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
+ reindexlen = EstimateReindexStateSpace();
+ shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
/* If you add more chunks here, you probably need to add keys. */
- shm_toc_estimate_keys(&pcxt->estimator, 7);
+ shm_toc_estimate_keys(&pcxt->estimator, 8);
/* Estimate space need for error queues. */
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
@@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
char *tsnapspace;
char *asnapspace;
char *tstatespace;
+ char *reindexspace;
char *error_queue_space;
char *session_dsm_handle_space;
char *entrypointstate;
@@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
SerializeTransactionState(tstatelen, tstatespace);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
+ /* Serialize reindex state. */
+ reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
+ SerializeReindexState(reindexlen, reindexspace);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
+
/* Allocate space for worker information. */
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
@@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg)
char *tsnapspace;
char *asnapspace;
char *tstatespace;
+ char *reindexspace;
StringInfoData msgbuf;
char *session_dsm_handle_space;
@@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg)
/* Set ParallelMasterBackendId so we know how to address temp relations. */
ParallelMasterBackendId = fps->parallel_master_backend_id;
+ /* Restore reindex state. */
+ reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
+ RestoreReindexState(reindexspace);
+
/*
* We've initialized all of our state now; nothing should change
* hereafter.
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 330488b96f5..007b929a6fa 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -86,6 +86,18 @@ typedef struct
tups_inserted;
} v_i_state;
+/*
+ * Pointer-free representation of variables used when reindexing system
+ * catalogs; we use this to propagate those values to parallel workers.
+ */
+typedef struct
+{
+ Oid currentlyReindexedHeap;
+ Oid currentlyReindexedIndex;
+ int numPendingReindexedIndexes;
+ Oid pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER];
+} SerializedReindexState;
+
/* non-export function prototypes */
static bool relationHasPrimaryKey(Relation rel);
static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
@@ -3653,7 +3665,8 @@ reindex_relation(Oid relid, int flags, int options)
* When we are busy reindexing a system index, this code provides support
* for preventing catalog lookups from using that index. We also make use
* of this to catch attempted uses of user indexes during reindexing of
- * those indexes.
+ * those indexes. This information is propagated to parallel workers;
+ * attempting to change it during a parallel operation is not permitted.
* ----------------------------------------------------------------
*/
@@ -3719,6 +3732,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid)
static void
ResetReindexProcessing(void)
{
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
currentlyReindexedHeap = InvalidOid;
currentlyReindexedIndex = InvalidOid;
}
@@ -3736,6 +3751,8 @@ SetReindexPending(List *indexes)
/* Reindexing is not re-entrant. */
if (pendingReindexedIndexes)
elog(ERROR, "cannot reindex while reindexing");
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = list_copy(indexes);
}
@@ -3746,6 +3763,8 @@ SetReindexPending(List *indexes)
static void
RemoveReindexPending(Oid indexOid)
{
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes,
indexOid);
}
@@ -3757,5 +3776,59 @@ RemoveReindexPending(Oid indexOid)
static void
ResetReindexPending(void)
{
+ if (IsInParallelMode())
+ elog(ERROR, "cannot modify reindex state during a parallel operation");
pendingReindexedIndexes = NIL;
}
+
+/*
+ * EstimateReindexStateSpace
+ * Estimate space needed to pass reindex state to parallel workers.
+ */
+extern Size
+EstimateReindexStateSpace(void)
+{
+ return offsetof(SerializedReindexState, pendingReindexedIndexes)
+ + mul_size(sizeof(Oid), list_length(pendingReindexedIndexes));
+}
+
+/*
+ * SerializeReindexState
+ * Serialize reindex state for parallel workers.
+ */
+extern void
+SerializeReindexState(Size maxsize, char *start_address)
+{
+ SerializedReindexState *sistate = (SerializedReindexState *) start_address;
+ int c = 0;
+ ListCell *lc;
+
+ sistate->currentlyReindexedHeap = currentlyReindexedHeap;
+ sistate->currentlyReindexedIndex = currentlyReindexedIndex;
+ sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes);
+ foreach(lc, pendingReindexedIndexes)
+ sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc);
+}
+
+/*
+ * RestoreReindexState
+ * Restore reindex state in a parallel worker.
+ */
+extern void
+RestoreReindexState(void *reindexstate)
+{
+ SerializedReindexState *sistate = (SerializedReindexState *) reindexstate;
+ int c = 0;
+ MemoryContext oldcontext;
+
+ currentlyReindexedHeap = sistate->currentlyReindexedHeap;
+ currentlyReindexedIndex = sistate->currentlyReindexedIndex;
+
+ Assert(pendingReindexedIndexes == NIL);
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ for (c = 0; c < sistate->numPendingReindexedIndexes; ++c)
+ pendingReindexedIndexes =
+ lappend_oid(pendingReindexedIndexes,
+ sistate->pendingReindexedIndexes[c]);
+ MemoryContextSwitchTo(oldcontext);
+}
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 12bf35567a7..4790f0c735f 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
extern bool ReindexIsProcessingIndex(Oid indexOid);
extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
+extern Size EstimateReindexStateSpace(void);
+extern void SerializeReindexState(Size maxsize, char *start_address);
+extern void RestoreReindexState(void *reindexstate);
+
#endif /* INDEX_H */