aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/launcher.c3
-rw-r--r--src/backend/replication/logical/worker.c82
-rw-r--r--src/backend/storage/file/Makefile1
-rw-r--r--src/backend/storage/file/buffile.c84
-rw-r--r--src/backend/storage/file/fd.c2
-rw-r--r--src/backend/storage/file/fileset.c205
-rw-r--r--src/backend/storage/file/sharedfileset.c244
-rw-r--r--src/backend/utils/sort/logtape.c8
-rw-r--r--src/backend/utils/sort/sharedtuplestore.c5
-rw-r--r--src/include/replication/worker_internal.h1
-rw-r--r--src/include/storage/buffile.h14
-rw-r--r--src/include/storage/fileset.h40
-rw-r--r--src/include/storage/sharedfileset.h14
-rw-r--r--src/tools/pgindent/typedefs.list1
14 files changed, 368 insertions, 336 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e3b11daa897..8b1772db69e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -648,6 +648,9 @@ logicalrep_worker_onexit(int code, Datum arg)
logicalrep_worker_detach();
+ /* Cleanup filesets used for streaming transactions. */
+ logicalrep_worker_cleanupfileset();
+
ApplyLauncherWakeup();
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 295b1e06de7..bfb7d1a261c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -39,13 +39,13 @@
* BufFile infrastructure supports temporary files that exceed the OS file size
* limit, (b) provides a way for automatic clean up on the error and (c) provides
* a way to survive these files across local transactions and allow to open and
- * close at stream start and close. We decided to use SharedFileSet
+ * close at stream start and close. We decided to use FileSet
* infrastructure as without that it deletes the files on the closure of the
* file and if we decide to keep stream files open across the start/stop stream
* then it will consume a lot of memory (more than 8K for each BufFile and
* there could be multiple such BufFiles as the subscriber could receive
* multiple start/stop streams for different transactions before getting the
- * commit). Moreover, if we don't use SharedFileSet then we also need to invent
+ * commit). Moreover, if we don't use FileSet then we also need to invent
* a new way to pass filenames to BufFile APIs so that we are allowed to open
* the file we desired across multiple stream-open calls for the same
* transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
typedef struct StreamXidHash
{
TransactionId xid; /* xid is the hash key and must be first */
- SharedFileSet *stream_fileset; /* shared file set for stream data */
- SharedFileSet *subxact_fileset; /* shared file set for subxact info */
+ FileSet *stream_fileset; /* file set for stream data */
+ FileSet *subxact_fileset; /* file set for subxact info */
} StreamXidHash;
static MemoryContext ApplyMessageContext = NULL;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
/*
- * Hash table for storing the streaming xid information along with shared file
- * set for streaming and subxact files.
+ * Hash table for storing the streaming xid information along with filesets
+ * for streaming and subxact files.
*/
static HTAB *xidhash = NULL;
@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
- fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
/* OK, truncate the file at the right offset */
- BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
- subxact_data.subxacts[subidx].offset);
+ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
+ subxact_data.subxacts[subidx].offset);
BufFileClose(fd);
/* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
errmsg_internal("transaction %u not found in stream XID hash table",
xid)));
- fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
@@ -2542,6 +2542,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
/*
+ * Cleanup filesets.
+ */
+void
+logicalrep_worker_cleanupfileset(void)
+{
+ HASH_SEQ_STATUS status;
+ StreamXidHash *hentry;
+
+ /* Remove all the pending stream and subxact filesets. */
+ if (xidhash)
+ {
+ hash_seq_init(&status, xidhash);
+ while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
+ {
+ FileSetDeleteAll(hentry->stream_fileset);
+
+ /* Delete the subxact fileset iff it is created. */
+ if (hentry->subxact_fileset)
+ FileSetDeleteAll(hentry->subxact_fileset);
+ }
+ }
+}
+
+/*
* Apply main loop.
*/
static void
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
if (ent->subxact_fileset)
{
cleanup_subxact_info();
- SharedFileSetDeleteAll(ent->subxact_fileset);
+ FileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
MemoryContext oldctx;
/*
- * We need to maintain shared fileset across multiple stream
- * start/stop calls. So, need to allocate it in a persistent context.
+ * We need to maintain fileset across multiple stream start/stop
+ * calls. So, need to allocate it in a persistent context.
*/
oldctx = MemoryContextSwitchTo(ApplyContext);
- ent->subxact_fileset = palloc(sizeof(SharedFileSet));
- SharedFileSetInit(ent->subxact_fileset, NULL);
+ ent->subxact_fileset = palloc(sizeof(FileSet));
+ FileSetInit(ent->subxact_fileset);
MemoryContextSwitchTo(oldctx);
- fd = BufFileCreateShared(ent->subxact_fileset, path);
+ fd = BufFileCreateFileSet(ent->subxact_fileset, path);
}
else
- fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
subxact_filename(path, subid, xid);
- fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
/* read number of subxact items */
if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3264,7 +3288,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
/* Delete the change file and release the stream fileset memory */
changes_filename(path, subid, xid);
- SharedFileSetDeleteAll(ent->stream_fileset);
+ FileSetDeleteAll(ent->stream_fileset);
pfree(ent->stream_fileset);
ent->stream_fileset = NULL;
@@ -3272,7 +3296,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
if (ent->subxact_fileset)
{
subxact_filename(path, subid, xid);
- SharedFileSetDeleteAll(ent->subxact_fileset);
+ FileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the shared fileset and create the
- * buffile, otherwise open the previously created file.
+ * changes for this transaction, initialize the fileset and create the buffile,
+ * otherwise open the previously created file.
*
* This can only be called at the beginning of a "streaming" block, i.e.
* between stream_start/stream_stop messages from the upstream.
@@ -3330,7 +3354,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
if (first_segment)
{
MemoryContext savectx;
- SharedFileSet *fileset;
+ FileSet *fileset;
if (found)
ereport(ERROR,
@@ -3338,16 +3362,16 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
/*
- * We need to maintain shared fileset across multiple stream
- * start/stop calls. So, need to allocate it in a persistent context.
+ * We need to maintain fileset across multiple stream start/stop
+ * calls. So, need to allocate it in a persistent context.
*/
savectx = MemoryContextSwitchTo(ApplyContext);
- fileset = palloc(sizeof(SharedFileSet));
+ fileset = palloc(sizeof(FileSet));
- SharedFileSetInit(fileset, NULL);
+ FileSetInit(fileset);
MemoryContextSwitchTo(savectx);
- stream_fd = BufFileCreateShared(fileset, path);
+ stream_fd = BufFileCreateFileSet(fileset, path);
/* Remember the fileset for the next stream of the same transaction */
ent->xid = xid;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
- stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+ stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
BufFileSeek(stream_fd, 0, 0, SEEK_END);
}
diff --git a/src/backend/storage/file/Makefile b/src/backend/storage/file/Makefile
index 5e1291bf2d5..660ac51807e 100644
--- a/src/backend/storage/file/Makefile
+++ b/src/backend/storage/file/Makefile
@@ -16,6 +16,7 @@ OBJS = \
buffile.o \
copydir.o \
fd.o \
+ fileset.o \
reinit.o \
sharedfileset.o
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index a4be5fe5135..5e5409d84d9 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -39,7 +39,7 @@
* BufFile also supports temporary files that can be used by the single backend
* when the corresponding files need to be survived across the transaction and
* need to be opened and closed multiple times. Such files need to be created
- * as a member of a SharedFileSet.
+ * as a member of a FileSet.
*-------------------------------------------------------------------------
*/
@@ -77,8 +77,8 @@ struct BufFile
bool dirty; /* does buffer need to be written? */
bool readOnly; /* has the file been set to read only? */
- SharedFileSet *fileset; /* space for segment files if shared */
- const char *name; /* name of this BufFile if shared */
+ FileSet *fileset; /* space for fileset based segment files */
+ const char *name; /* name of fileset based BufFile */
/*
* resowner is the ResourceOwner to use for underlying temp files. (We
@@ -104,7 +104,7 @@ static void extendBufFile(BufFile *file);
static void BufFileLoadBuffer(BufFile *file);
static void BufFileDumpBuffer(BufFile *file);
static void BufFileFlush(BufFile *file);
-static File MakeNewSharedSegment(BufFile *file, int segment);
+static File MakeNewFileSetSegment(BufFile *file, int segment);
/*
* Create BufFile and perform the common initialization.
@@ -160,7 +160,7 @@ extendBufFile(BufFile *file)
if (file->fileset == NULL)
pfile = OpenTemporaryFile(file->isInterXact);
else
- pfile = MakeNewSharedSegment(file, file->numFiles);
+ pfile = MakeNewFileSetSegment(file, file->numFiles);
Assert(pfile >= 0);
@@ -214,34 +214,34 @@ BufFileCreateTemp(bool interXact)
* Build the name for a given segment of a given BufFile.
*/
static void
-SharedSegmentName(char *name, const char *buffile_name, int segment)
+FileSetSegmentName(char *name, const char *buffile_name, int segment)
{
snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
}
/*
- * Create a new segment file backing a shared BufFile.
+ * Create a new segment file backing a fileset based BufFile.
*/
static File
-MakeNewSharedSegment(BufFile *buffile, int segment)
+MakeNewFileSetSegment(BufFile *buffile, int segment)
{
char name[MAXPGPATH];
File file;
/*
* It is possible that there are files left over from before a crash
- * restart with the same name. In order for BufFileOpenShared() not to
+ * restart with the same name. In order for BufFileOpenFileSet() not to
* get confused about how many segments there are, we'll unlink the next
* segment number if it already exists.
*/
- SharedSegmentName(name, buffile->name, segment + 1);
- SharedFileSetDelete(buffile->fileset, name, true);
+ FileSetSegmentName(name, buffile->name, segment + 1);
+ FileSetDelete(buffile->fileset, name, true);
/* Create the new segment. */
- SharedSegmentName(name, buffile->name, segment);
- file = SharedFileSetCreate(buffile->fileset, name);
+ FileSetSegmentName(name, buffile->name, segment);
+ file = FileSetCreate(buffile->fileset, name);
- /* SharedFileSetCreate would've errored out */
+ /* FileSetCreate would've errored out */
Assert(file > 0);
return file;
@@ -251,15 +251,15 @@ MakeNewSharedSegment(BufFile *buffile, int segment)
* Create a BufFile that can be discovered and opened read-only by other
* backends that are attached to the same SharedFileSet using the same name.
*
- * The naming scheme for shared BufFiles is left up to the calling code. The
- * name will appear as part of one or more filenames on disk, and might
+ * The naming scheme for fileset based BufFiles is left up to the calling code.
+ * The name will appear as part of one or more filenames on disk, and might
* provide clues to administrators about which subsystem is generating
* temporary file data. Since each SharedFileSet object is backed by one or
* more uniquely named temporary directory, names don't conflict with
* unrelated SharedFileSet objects.
*/
BufFile *
-BufFileCreateShared(SharedFileSet *fileset, const char *name)
+BufFileCreateFileSet(FileSet *fileset, const char *name)
{
BufFile *file;
@@ -267,7 +267,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
file->fileset = fileset;
file->name = pstrdup(name);
file->files = (File *) palloc(sizeof(File));
- file->files[0] = MakeNewSharedSegment(file, 0);
+ file->files[0] = MakeNewFileSetSegment(file, 0);
file->readOnly = false;
return file;
@@ -275,13 +275,13 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
/*
* Open a file that was previously created in another backend (or this one)
- * with BufFileCreateShared in the same SharedFileSet using the same name.
+ * with BufFileCreateFileSet in the same FileSet using the same name.
* The backend that created the file must have called BufFileClose() or
- * BufFileExportShared() to make sure that it is ready to be opened by other
+ * BufFileExportFileSet() to make sure that it is ready to be opened by other
* backends and render it read-only.
*/
BufFile *
-BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
+BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
{
BufFile *file;
char segment_name[MAXPGPATH];
@@ -304,8 +304,8 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
files = repalloc(files, sizeof(File) * capacity);
}
/* Try to load a segment. */
- SharedSegmentName(segment_name, name, nfiles);
- files[nfiles] = SharedFileSetOpen(fileset, segment_name, mode);
+ FileSetSegmentName(segment_name, name, nfiles);
+ files[nfiles] = FileSetOpen(fileset, segment_name, mode);
if (files[nfiles] <= 0)
break;
++nfiles;
@@ -333,18 +333,18 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name, int mode)
}
/*
- * Delete a BufFile that was created by BufFileCreateShared in the given
- * SharedFileSet using the given name.
+ * Delete a BufFile that was created by BufFileCreateFileSet in the given
+ * FileSet using the given name.
*
* It is not necessary to delete files explicitly with this function. It is
* provided only as a way to delete files proactively, rather than waiting for
- * the SharedFileSet to be cleaned up.
+ * the FileSet to be cleaned up.
*
* Only one backend should attempt to delete a given name, and should know
* that it exists and has been exported or closed.
*/
void
-BufFileDeleteShared(SharedFileSet *fileset, const char *name)
+BufFileDeleteFileSet(FileSet *fileset, const char *name)
{
char segment_name[MAXPGPATH];
int segment = 0;
@@ -357,8 +357,8 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name)
*/
for (;;)
{
- SharedSegmentName(segment_name, name, segment);
- if (!SharedFileSetDelete(fileset, segment_name, true))
+ FileSetSegmentName(segment_name, name, segment);
+ if (!FileSetDelete(fileset, segment_name, true))
break;
found = true;
++segment;
@@ -367,16 +367,16 @@ BufFileDeleteShared(SharedFileSet *fileset, const char *name)
}
if (!found)
- elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name);
+ elog(ERROR, "could not delete unknown BufFile \"%s\"", name);
}
/*
- * BufFileExportShared --- flush and make read-only, in preparation for sharing.
+ * BufFileExportFileSet --- flush and make read-only, in preparation for sharing.
*/
void
-BufFileExportShared(BufFile *file)
+BufFileExportFileSet(BufFile *file)
{
- /* Must be a file belonging to a SharedFileSet. */
+ /* Must be a file belonging to a FileSet. */
Assert(file->fileset != NULL);
/* It's probably a bug if someone calls this twice. */
@@ -785,7 +785,7 @@ BufFileTellBlock(BufFile *file)
#endif
/*
- * Return the current shared BufFile size.
+ * Return the current fileset based BufFile size.
*
* Counts any holes left behind by BufFileAppend as part of the size.
* ereport()s on failure.
@@ -811,8 +811,8 @@ BufFileSize(BufFile *file)
}
/*
- * Append the contents of source file (managed within shared fileset) to
- * end of target file (managed within same shared fileset).
+ * Append the contents of source file (managed within fileset) to
+ * end of target file (managed within same fileset).
*
* Note that operation subsumes ownership of underlying resources from
* "source". Caller should never call BufFileClose against source having
@@ -854,11 +854,11 @@ BufFileAppend(BufFile *target, BufFile *source)
}
/*
- * Truncate a BufFile created by BufFileCreateShared up to the given fileno and
- * the offset.
+ * Truncate a BufFile created by BufFileCreateFileSet up to the given fileno
+ * and the offset.
*/
void
-BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
+BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
{
int numFiles = file->numFiles;
int newFile = fileno;
@@ -876,12 +876,12 @@ BufFileTruncateShared(BufFile *file, int fileno, off_t offset)
{
if ((i != fileno || offset == 0) && i != 0)
{
- SharedSegmentName(segment_name, file->name, i);
+ FileSetSegmentName(segment_name, file->name, i);
FileClose(file->files[i]);
- if (!SharedFileSetDelete(file->fileset, segment_name, true))
+ if (!FileSetDelete(file->fileset, segment_name, true))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not delete shared fileset \"%s\": %m",
+ errmsg("could not delete fileset \"%s\": %m",
segment_name)));
numFiles--;
newOffset = MAX_PHYSICAL_FILESIZE;
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index b58b3998345..433e2832a54 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -1921,7 +1921,7 @@ PathNameDeleteTemporaryFile(const char *path, bool error_on_failure)
/*
* Unlike FileClose's automatic file deletion code, we tolerate
- * non-existence to support BufFileDeleteShared which doesn't know how
+ * non-existence to support BufFileDeleteFileSet which doesn't know how
* many segments it has to delete until it runs out.
*/
if (stat_errno == ENOENT)
diff --git a/src/backend/storage/file/fileset.c b/src/backend/storage/file/fileset.c
new file mode 100644
index 00000000000..282ff12b854
--- /dev/null
+++ b/src/backend/storage/file/fileset.c
@@ -0,0 +1,205 @@
+/*-------------------------------------------------------------------------
+ *
+ * fileset.c
+ * Management of named temporary files.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/file/fileset.c
+ *
+ * FileSets provide a temporary namespace (think directory) so that files can
+ * be discovered by name.
+ *
+ * FileSets can be used by backends when the temporary files need to be
+ * opened/closed multiple times and the underlying files need to survive across
+ * transactions.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <limits.h>
+
+#include "catalog/pg_tablespace.h"
+#include "commands/tablespace.h"
+#include "common/hashfn.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/fileset.h"
+#include "utils/builtins.h"
+
+static void FileSetPath(char *path, FileSet *fileset, Oid tablespace);
+static void FilePath(char *path, FileSet *fileset, const char *name);
+static Oid ChooseTablespace(const FileSet *fileset, const char *name);
+
+/*
+ * Initialize a space for temporary files. This API can be used by shared
+ * fileset as well as if the temporary files are used only by single backend
+ * but the files need to be opened and closed multiple times and also the
+ * underlying files need to survive across transactions.
+ *
+ * The callers are expected to explicitly remove such files by using
+ * FileSetDelete/FileSetDeleteAll.
+ *
+ * Files will be distributed over the tablespaces configured in
+ * temp_tablespaces.
+ *
+ * Under the covers the set is one or more directories which will eventually
+ * be deleted.
+ */
+void
+FileSetInit(FileSet *fileset)
+{
+ static uint32 counter = 0;
+
+ fileset->creator_pid = MyProcPid;
+ fileset->number = counter;
+ counter = (counter + 1) % INT_MAX;
+
+ /* Capture the tablespace OIDs so that all backends agree on them. */
+ PrepareTempTablespaces();
+ fileset->ntablespaces =
+ GetTempTablespaces(&fileset->tablespaces[0],
+ lengthof(fileset->tablespaces));
+ if (fileset->ntablespaces == 0)
+ {
+ /* If the GUC is empty, use current database's default tablespace */
+ fileset->tablespaces[0] = MyDatabaseTableSpace;
+ fileset->ntablespaces = 1;
+ }
+ else
+ {
+ int i;
+
+ /*
+ * An entry of InvalidOid means use the default tablespace for the
+ * current database. Replace that now, to be sure that all users of
+ * the FileSet agree on what to do.
+ */
+ for (i = 0; i < fileset->ntablespaces; i++)
+ {
+ if (fileset->tablespaces[i] == InvalidOid)
+ fileset->tablespaces[i] = MyDatabaseTableSpace;
+ }
+ }
+}
+
+/*
+ * Create a new file in the given set.
+ */
+File
+FileSetCreate(FileSet *fileset, const char *name)
+{
+ char path[MAXPGPATH];
+ File file;
+
+ FilePath(path, fileset, name);
+ file = PathNameCreateTemporaryFile(path, false);
+
+ /* If we failed, see if we need to create the directory on demand. */
+ if (file <= 0)
+ {
+ char tempdirpath[MAXPGPATH];
+ char filesetpath[MAXPGPATH];
+ Oid tablespace = ChooseTablespace(fileset, name);
+
+ TempTablespacePath(tempdirpath, tablespace);
+ FileSetPath(filesetpath, fileset, tablespace);
+ PathNameCreateTemporaryDir(tempdirpath, filesetpath);
+ file = PathNameCreateTemporaryFile(path, true);
+ }
+
+ return file;
+}
+
+/*
+ * Open a file that was created with FileSetCreate() */
+File
+FileSetOpen(FileSet *fileset, const char *name, int mode)
+{
+ char path[MAXPGPATH];
+ File file;
+
+ FilePath(path, fileset, name);
+ file = PathNameOpenTemporaryFile(path, mode);
+
+ return file;
+}
+
+/*
+ * Delete a file that was created with FileSetCreate().
+ *
+ * Return true if the file existed, false if didn't.
+ */
+bool
+FileSetDelete(FileSet *fileset, const char *name,
+ bool error_on_failure)
+{
+ char path[MAXPGPATH];
+
+ FilePath(path, fileset, name);
+
+ return PathNameDeleteTemporaryFile(path, error_on_failure);
+}
+
+/*
+ * Delete all files in the set.
+ */
+void
+FileSetDeleteAll(FileSet *fileset)
+{
+ char dirpath[MAXPGPATH];
+ int i;
+
+ /*
+ * Delete the directory we created in each tablespace. Doesn't fail
+ * because we use this in error cleanup paths, but can generate LOG
+ * message on IO error.
+ */
+ for (i = 0; i < fileset->ntablespaces; ++i)
+ {
+ FileSetPath(dirpath, fileset, fileset->tablespaces[i]);
+ PathNameDeleteTemporaryDir(dirpath);
+ }
+}
+
+/*
+ * Build the path for the directory holding the files backing a FileSet in a
+ * given tablespace.
+ */
+static void
+FileSetPath(char *path, FileSet *fileset, Oid tablespace)
+{
+ char tempdirpath[MAXPGPATH];
+
+ TempTablespacePath(tempdirpath, tablespace);
+ snprintf(path, MAXPGPATH, "%s/%s%lu.%u.fileset",
+ tempdirpath, PG_TEMP_FILE_PREFIX,
+ (unsigned long) fileset->creator_pid, fileset->number);
+}
+
+/*
+ * Sorting has to determine which tablespace a given temporary file belongs in.
+ */
+static Oid
+ChooseTablespace(const FileSet *fileset, const char *name)
+{
+ uint32 hash = hash_any((const unsigned char *) name, strlen(name));
+
+ return fileset->tablespaces[hash % fileset->ntablespaces];
+}
+
+/*
+ * Compute the full path of a file in a FileSet.
+ */
+static void
+FilePath(char *path, FileSet *fileset, const char *name)
+{
+ char dirpath[MAXPGPATH];
+
+ FileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
+ snprintf(path, MAXPGPATH, "%s/%s", dirpath, name);
+}
diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c
index ed37c940adc..6a33fac4353 100644
--- a/src/backend/storage/file/sharedfileset.c
+++ b/src/backend/storage/file/sharedfileset.c
@@ -13,10 +13,6 @@
* files can be discovered by name, and a shared ownership semantics so that
* shared files survive until the last user detaches.
*
- * SharedFileSets can be used by backends when the temporary files need to be
- * opened/closed multiple times and the underlying files need to survive across
- * transactions.
- *
*-------------------------------------------------------------------------
*/
@@ -33,13 +29,7 @@
#include "storage/sharedfileset.h"
#include "utils/builtins.h"
-static List *filesetlist = NIL;
-
static void SharedFileSetOnDetach(dsm_segment *segment, Datum datum);
-static void SharedFileSetDeleteOnProcExit(int status, Datum arg);
-static void SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace);
-static void SharedFilePath(char *path, SharedFileSet *fileset, const char *name);
-static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name);
/*
* Initialize a space for temporary files that can be opened by other backends.
@@ -47,77 +37,22 @@ static Oid ChooseTablespace(const SharedFileSet *fileset, const char *name);
* SharedFileSet with 'seg'. Any contained files will be deleted when the
* last backend detaches.
*
- * We can also use this interface if the temporary files are used only by
- * single backend but the files need to be opened and closed multiple times
- * and also the underlying files need to survive across transactions. For
- * such cases, dsm segment 'seg' should be passed as NULL. Callers are
- * expected to explicitly remove such files by using SharedFileSetDelete/
- * SharedFileSetDeleteAll or we remove such files on proc exit.
- *
- * Files will be distributed over the tablespaces configured in
- * temp_tablespaces.
- *
* Under the covers the set is one or more directories which will eventually
* be deleted.
*/
void
SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg)
{
- static uint32 counter = 0;
-
+ /* Initialize the shared fileset specific members. */
SpinLockInit(&fileset->mutex);
fileset->refcnt = 1;
- fileset->creator_pid = MyProcPid;
- fileset->number = counter;
- counter = (counter + 1) % INT_MAX;
-
- /* Capture the tablespace OIDs so that all backends agree on them. */
- PrepareTempTablespaces();
- fileset->ntablespaces =
- GetTempTablespaces(&fileset->tablespaces[0],
- lengthof(fileset->tablespaces));
- if (fileset->ntablespaces == 0)
- {
- /* If the GUC is empty, use current database's default tablespace */
- fileset->tablespaces[0] = MyDatabaseTableSpace;
- fileset->ntablespaces = 1;
- }
- else
- {
- int i;
- /*
- * An entry of InvalidOid means use the default tablespace for the
- * current database. Replace that now, to be sure that all users of
- * the SharedFileSet agree on what to do.
- */
- for (i = 0; i < fileset->ntablespaces; i++)
- {
- if (fileset->tablespaces[i] == InvalidOid)
- fileset->tablespaces[i] = MyDatabaseTableSpace;
- }
- }
+ /* Initialize the fileset. */
+ FileSetInit(&fileset->fs);
/* Register our cleanup callback. */
if (seg)
on_dsm_detach(seg, SharedFileSetOnDetach, PointerGetDatum(fileset));
- else
- {
- static bool registered_cleanup = false;
-
- if (!registered_cleanup)
- {
- /*
- * We must not have registered any fileset before registering the
- * fileset clean up.
- */
- Assert(filesetlist == NIL);
- on_proc_exit(SharedFileSetDeleteOnProcExit, 0);
- registered_cleanup = true;
- }
-
- filesetlist = lcons((void *) fileset, filesetlist);
- }
}
/*
@@ -148,86 +83,12 @@ SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg)
}
/*
- * Create a new file in the given set.
- */
-File
-SharedFileSetCreate(SharedFileSet *fileset, const char *name)
-{
- char path[MAXPGPATH];
- File file;
-
- SharedFilePath(path, fileset, name);
- file = PathNameCreateTemporaryFile(path, false);
-
- /* If we failed, see if we need to create the directory on demand. */
- if (file <= 0)
- {
- char tempdirpath[MAXPGPATH];
- char filesetpath[MAXPGPATH];
- Oid tablespace = ChooseTablespace(fileset, name);
-
- TempTablespacePath(tempdirpath, tablespace);
- SharedFileSetPath(filesetpath, fileset, tablespace);
- PathNameCreateTemporaryDir(tempdirpath, filesetpath);
- file = PathNameCreateTemporaryFile(path, true);
- }
-
- return file;
-}
-
-/*
- * Open a file that was created with SharedFileSetCreate(), possibly in
- * another backend.
- */
-File
-SharedFileSetOpen(SharedFileSet *fileset, const char *name, int mode)
-{
- char path[MAXPGPATH];
- File file;
-
- SharedFilePath(path, fileset, name);
- file = PathNameOpenTemporaryFile(path, mode);
-
- return file;
-}
-
-/*
- * Delete a file that was created with SharedFileSetCreate().
- * Return true if the file existed, false if didn't.
- */
-bool
-SharedFileSetDelete(SharedFileSet *fileset, const char *name,
- bool error_on_failure)
-{
- char path[MAXPGPATH];
-
- SharedFilePath(path, fileset, name);
-
- return PathNameDeleteTemporaryFile(path, error_on_failure);
-}
-
-/*
* Delete all files in the set.
*/
void
SharedFileSetDeleteAll(SharedFileSet *fileset)
{
- char dirpath[MAXPGPATH];
- int i;
-
- /*
- * Delete the directory we created in each tablespace. Doesn't fail
- * because we use this in error cleanup paths, but can generate LOG
- * message on IO error.
- */
- for (i = 0; i < fileset->ntablespaces; ++i)
- {
- SharedFileSetPath(dirpath, fileset, fileset->tablespaces[i]);
- PathNameDeleteTemporaryDir(dirpath);
- }
-
- /* Unregister the shared fileset */
- SharedFileSetUnregister(fileset);
+ FileSetDeleteAll(&fileset->fs);
}
/*
@@ -255,100 +116,5 @@ SharedFileSetOnDetach(dsm_segment *segment, Datum datum)
* this function so we can safely access its data.
*/
if (unlink_all)
- SharedFileSetDeleteAll(fileset);
-}
-
-/*
- * Callback function that will be invoked on the process exit. This will
- * process the list of all the registered sharedfilesets and delete the
- * underlying files.
- */
-static void
-SharedFileSetDeleteOnProcExit(int status, Datum arg)
-{
- /*
- * Remove all the pending shared fileset entries. We don't use foreach()
- * here because SharedFileSetDeleteAll will remove the current element in
- * filesetlist. Though we have used foreach_delete_current() to remove the
- * element from filesetlist it could only fix up the state of one of the
- * loops, see SharedFileSetUnregister.
- */
- while (list_length(filesetlist) > 0)
- {
- SharedFileSet *fileset = (SharedFileSet *) linitial(filesetlist);
-
- SharedFileSetDeleteAll(fileset);
- }
-
- filesetlist = NIL;
-}
-
-/*
- * Unregister the shared fileset entry registered for cleanup on proc exit.
- */
-void
-SharedFileSetUnregister(SharedFileSet *input_fileset)
-{
- ListCell *l;
-
- /*
- * If the caller is following the dsm based cleanup then we don't maintain
- * the filesetlist so return.
- */
- if (filesetlist == NIL)
- return;
-
- foreach(l, filesetlist)
- {
- SharedFileSet *fileset = (SharedFileSet *) lfirst(l);
-
- /* Remove the entry from the list */
- if (input_fileset == fileset)
- {
- filesetlist = foreach_delete_current(filesetlist, l);
- return;
- }
- }
-
- /* Should have found a match */
- Assert(false);
-}
-
-/*
- * Build the path for the directory holding the files backing a SharedFileSet
- * in a given tablespace.
- */
-static void
-SharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace)
-{
- char tempdirpath[MAXPGPATH];
-
- TempTablespacePath(tempdirpath, tablespace);
- snprintf(path, MAXPGPATH, "%s/%s%lu.%u.sharedfileset",
- tempdirpath, PG_TEMP_FILE_PREFIX,
- (unsigned long) fileset->creator_pid, fileset->number);
-}
-
-/*
- * Sorting hat to determine which tablespace a given shared temporary file
- * belongs in.
- */
-static Oid
-ChooseTablespace(const SharedFileSet *fileset, const char *name)
-{
- uint32 hash = hash_any((const unsigned char *) name, strlen(name));
-
- return fileset->tablespaces[hash % fileset->ntablespaces];
-}
-
-/*
- * Compute the full path of a file in a SharedFileSet.
- */
-static void
-SharedFilePath(char *path, SharedFileSet *fileset, const char *name)
-{
- char dirpath[MAXPGPATH];
-
- SharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name));
- snprintf(path, MAXPGPATH, "%s/%s", dirpath, name);
+ FileSetDeleteAll(&fileset->fs);
}
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index cafc0872549..f7994d771d6 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
lt = &lts->tapes[i];
pg_itoa(i, filename);
- file = BufFileOpenShared(fileset, filename, O_RDONLY);
+ file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY);
filesize = BufFileSize(file);
/*
@@ -610,7 +610,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
* offset).
*
* The only thing that currently prevents writing to the leader tape from
- * working is the fact that BufFiles opened using BufFileOpenShared() are
+ * working is the fact that BufFiles opened using BufFileOpenFileSet() are
* read-only by definition, but that could be changed if it seemed
* worthwhile. For now, writing to the leader tape will raise a "Bad file
* descriptor" error, so tuplesort must avoid writing to the leader tape
@@ -722,7 +722,7 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
char filename[MAXPGPATH];
pg_itoa(worker, filename);
- lts->pfile = BufFileCreateShared(fileset, filename);
+ lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
}
else
lts->pfile = BufFileCreateTemp(false);
@@ -1096,7 +1096,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
/* Handle extra steps when caller is to share its tapeset */
if (share)
{
- BufFileExportShared(lts->pfile);
+ BufFileExportFileSet(lts->pfile);
share->firstblocknumber = lt->firstBlockNumber;
}
}
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index 57e35db4f8d..504ef1c2869 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -310,7 +310,8 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
/* Create one. Only this backend will write into it. */
sts_filename(name, accessor, accessor->participant);
- accessor->write_file = BufFileCreateShared(accessor->fileset, name);
+ accessor->write_file =
+ BufFileCreateFileSet(&accessor->fileset->fs, name);
/* Set up the shared state for this backend's file. */
participant = &accessor->sts->participants[accessor->participant];
@@ -559,7 +560,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
sts_filename(name, accessor, accessor->read_participant);
accessor->read_file =
- BufFileOpenShared(accessor->fileset, name, O_RDONLY);
+ BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY);
}
/* Seek and load the chunk header. */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 41c7487393f..a6c9d4e2a10 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -79,6 +79,7 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
+extern void logicalrep_worker_cleanupfileset(void);
extern int logicalrep_sync_worker_count(Oid subid);
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 566523de1fa..143eada85fe 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -26,7 +26,7 @@
#ifndef BUFFILE_H
#define BUFFILE_H
-#include "storage/sharedfileset.h"
+#include "storage/fileset.h"
/* BufFile is an opaque type whose details are not known outside buffile.c. */
@@ -46,11 +46,11 @@ extern int BufFileSeekBlock(BufFile *file, long blknum);
extern int64 BufFileSize(BufFile *file);
extern long BufFileAppend(BufFile *target, BufFile *source);
-extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
-extern void BufFileExportShared(BufFile *file);
-extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name,
- int mode);
-extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
-extern void BufFileTruncateShared(BufFile *file, int fileno, off_t offset);
+extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
+extern void BufFileExportFileSet(BufFile *file);
+extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
+ int mode);
+extern void BufFileDeleteFileSet(FileSet *fileset, const char *name);
+extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
#endif /* BUFFILE_H */
diff --git a/src/include/storage/fileset.h b/src/include/storage/fileset.h
new file mode 100644
index 00000000000..be0e0978345
--- /dev/null
+++ b/src/include/storage/fileset.h
@@ -0,0 +1,40 @@
+/*-------------------------------------------------------------------------
+ *
+ * fileset.h
+ * Management of named temporary files.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/fileset.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef FILESET_H
+#define FILESET_H
+
+#include "storage/fd.h"
+
+/*
+ * A set of temporary files.
+ */
+typedef struct FileSet
+{
+ pid_t creator_pid; /* PID of the creating process */
+ uint32 number; /* per-PID identifier */
+ int ntablespaces; /* number of tablespaces to use */
+ Oid tablespaces[8]; /* OIDs of tablespaces to use. Assumes that
+ * it's rare that there more than temp
+ * tablespaces. */
+} FileSet;
+
+extern void FileSetInit(FileSet *fileset);
+extern File FileSetCreate(FileSet *fileset, const char *name);
+extern File FileSetOpen(FileSet *fileset, const char *name,
+ int mode);
+extern bool FileSetDelete(FileSet *fileset, const char *name,
+ bool error_on_failure);
+extern void FileSetDeleteAll(FileSet *fileset);
+
+#endif
diff --git a/src/include/storage/sharedfileset.h b/src/include/storage/sharedfileset.h
index 09ba121aafa..59becfbef82 100644
--- a/src/include/storage/sharedfileset.h
+++ b/src/include/storage/sharedfileset.h
@@ -17,6 +17,7 @@
#include "storage/dsm.h"
#include "storage/fd.h"
+#include "storage/fileset.h"
#include "storage/spin.h"
/*
@@ -24,24 +25,13 @@
*/
typedef struct SharedFileSet
{
- pid_t creator_pid; /* PID of the creating process */
- uint32 number; /* per-PID identifier */
+ FileSet fs;
slock_t mutex; /* mutex protecting the reference count */
int refcnt; /* number of attached backends */
- int ntablespaces; /* number of tablespaces to use */
- Oid tablespaces[8]; /* OIDs of tablespaces to use. Assumes that
- * it's rare that there more than temp
- * tablespaces. */
} SharedFileSet;
extern void SharedFileSetInit(SharedFileSet *fileset, dsm_segment *seg);
extern void SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg);
-extern File SharedFileSetCreate(SharedFileSet *fileset, const char *name);
-extern File SharedFileSetOpen(SharedFileSet *fileset, const char *name,
- int mode);
-extern bool SharedFileSetDelete(SharedFileSet *fileset, const char *name,
- bool error_on_failure);
extern void SharedFileSetDeleteAll(SharedFileSet *fileset);
-extern void SharedFileSetUnregister(SharedFileSet *input_fileset);
#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 621d0cb4dac..f31a1e4e1ec 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -709,6 +709,7 @@ File
FileFdwExecutionState
FileFdwPlanState
FileNameMap
+FileSet
FileTag
FinalPathExtraData
FindColsContext