diff options
Diffstat (limited to 'src/backend/storage/file/buffile.c')
-rw-r--r-- | src/backend/storage/file/buffile.c | 205 |
1 files changed, 204 insertions, 1 deletions
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 06bf2fadbf1..fa9940da9b3 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -31,12 +31,18 @@ * BufFile also supports temporary files that exceed the OS file size limit * (by opening multiple fd.c temporary files). This is an essential feature * for sorts and hashjoins on large amounts of data. + * + * BufFile supports temporary files that can be made read-only and shared with + * other backends, as infrastructure for parallel execution. Such files need + * to be created as a member of a SharedFileSet that all participants are + * attached to. *------------------------------------------------------------------------- */ #include "postgres.h" #include "executor/instrument.h" +#include "miscadmin.h" #include "pgstat.h" #include "storage/fd.h" #include "storage/buffile.h" @@ -70,6 +76,10 @@ struct BufFile bool isInterXact; /* keep open over transactions? */ bool dirty; /* does buffer need to be written? */ + bool readOnly; /* has the file been set to read only? */ + + SharedFileSet *fileset; /* space for segment files if shared */ + const char *name; /* name of this BufFile if shared */ /* * resowner is the ResourceOwner to use for underlying temp files. (We @@ -94,6 +104,7 @@ static void extendBufFile(BufFile *file); static void BufFileLoadBuffer(BufFile *file); static void BufFileDumpBuffer(BufFile *file); static int BufFileFlush(BufFile *file); +static File MakeNewSharedSegment(BufFile *file, int segment); /* @@ -117,6 +128,9 @@ makeBufFile(File firstfile) file->curOffset = 0L; file->pos = 0; file->nbytes = 0; + file->readOnly = false; + file->fileset = NULL; + file->name = NULL; return file; } @@ -134,7 +148,11 @@ extendBufFile(BufFile *file) oldowner = CurrentResourceOwner; CurrentResourceOwner = file->resowner; - pfile = OpenTemporaryFile(file->isInterXact); + if (file->fileset == NULL) + pfile = OpenTemporaryFile(file->isInterXact); + else + pfile = MakeNewSharedSegment(file, file->numFiles); + Assert(pfile >= 0); CurrentResourceOwner = oldowner; @@ -176,6 +194,189 @@ BufFileCreateTemp(bool interXact) } /* + * Build the name for a given segment of a given BufFile. + */ +static void +SharedSegmentName(char *name, const char *buffile_name, int segment) +{ + snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment); +} + +/* + * Create a new segment file backing a shared BufFile. + */ +static File +MakeNewSharedSegment(BufFile *buffile, int segment) +{ + char name[MAXPGPATH]; + File file; + + SharedSegmentName(name, buffile->name, segment); + file = SharedFileSetCreate(buffile->fileset, name); + + /* SharedFileSetCreate would've errored out */ + Assert(file > 0); + + return file; +} + +/* + * Create a BufFile that can be discovered and opened read-only by other + * backends that are attached to the same SharedFileSet using the same name. + * + * The naming scheme for shared BufFiles is left up to the calling code. The + * name will appear as part of one or more filenames on disk, and might + * provide clues to administrators about which subsystem is generating + * temporary file data. Since each SharedFileSet object is backed by one or + * more uniquely named temporary directory, names don't conflict with + * unrelated SharedFileSet objects. + */ +BufFile * +BufFileCreateShared(SharedFileSet *fileset, const char *name) +{ + BufFile *file; + + file = (BufFile *) palloc(sizeof(BufFile)); + file->fileset = fileset; + file->name = pstrdup(name); + file->numFiles = 1; + file->files = (File *) palloc(sizeof(File)); + file->files[0] = MakeNewSharedSegment(file, 0); + file->offsets = (off_t *) palloc(sizeof(off_t)); + file->offsets[0] = 0L; + file->isInterXact = false; + file->dirty = false; + file->resowner = CurrentResourceOwner; + file->curFile = 0; + file->curOffset = 0L; + file->pos = 0; + file->nbytes = 0; + file->readOnly = false; + file->name = pstrdup(name); + + return file; +} + +/* + * Open a file that was previously created in another backend (or this one) + * with BufFileCreateShared in the same SharedFileSet using the same name. + * The backend that created the file must have called BufFileClose() or + * BufFileExport() to make sure that it is ready to be opened by other + * backends and render it read-only. + */ +BufFile * +BufFileOpenShared(SharedFileSet *fileset, const char *name) +{ + BufFile *file = (BufFile *) palloc(sizeof(BufFile)); + char segment_name[MAXPGPATH]; + Size capacity = 16; + File *files = palloc(sizeof(File) * capacity); + int nfiles = 0; + + file = (BufFile *) palloc(sizeof(BufFile)); + files = palloc(sizeof(File) * capacity); + + /* + * We don't know how many segments there are, so we'll probe the + * filesystem to find out. + */ + for (;;) + { + /* See if we need to expand our file segment array. */ + if (nfiles + 1 > capacity) + { + capacity *= 2; + files = repalloc(files, sizeof(File) * capacity); + } + /* Try to load a segment. */ + SharedSegmentName(segment_name, name, nfiles); + files[nfiles] = SharedFileSetOpen(fileset, segment_name); + if (files[nfiles] <= 0) + break; + ++nfiles; + + CHECK_FOR_INTERRUPTS(); + } + + /* + * If we didn't find any files at all, then no BufFile exists with this + * name. + */ + if (nfiles == 0) + return NULL; + + file->numFiles = nfiles; + file->files = files; + file->offsets = (off_t *) palloc0(sizeof(off_t) * nfiles); + file->isInterXact = false; + file->dirty = false; + file->resowner = CurrentResourceOwner; /* Unused, can't extend */ + file->curFile = 0; + file->curOffset = 0L; + file->pos = 0; + file->nbytes = 0; + file->readOnly = true; /* Can't write to files opened this way */ + file->fileset = fileset; + file->name = pstrdup(name); + + return file; +} + +/* + * Delete a BufFile that was created by BufFileCreateShared in the given + * SharedFileSet using the given name. + * + * It is not necessary to delete files explicitly with this function. It is + * provided only as a way to delete files proactively, rather than waiting for + * the SharedFileSet to be cleaned up. + * + * Only one backend should attempt to delete a given name, and should know + * that it exists and has been exported or closed. + */ +void +BufFileDeleteShared(SharedFileSet *fileset, const char *name) +{ + char segment_name[MAXPGPATH]; + int segment = 0; + bool found = false; + + /* + * We don't know how many segments the file has. We'll keep deleting + * until we run out. If we don't manage to find even an initial segment, + * raise an error. + */ + for (;;) + { + SharedSegmentName(segment_name, name, segment); + if (!SharedFileSetDelete(fileset, segment_name, true)) + break; + found = true; + ++segment; + + CHECK_FOR_INTERRUPTS(); + } + + if (!found) + elog(ERROR, "could not delete unknown shared BufFile \"%s\"", name); +} + +/* + * BufFileExportShared --- flush and make read-only, in preparation for sharing. + */ +void +BufFileExportShared(BufFile *file) +{ + /* Must be a file belonging to a SharedFileSet. */ + Assert(file->fileset != NULL); + + /* It's probably a bug if someone calls this twice. */ + Assert(!file->readOnly); + + BufFileFlush(file); + file->readOnly = true; +} + +/* * Close a BufFile * * Like fclose(), this also implicitly FileCloses the underlying File. @@ -390,6 +591,8 @@ BufFileWrite(BufFile *file, void *ptr, size_t size) size_t nwritten = 0; size_t nthistime; + Assert(!file->readOnly); + while (size > 0) { if (file->pos >= BLCKSZ) |