aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c82
1 files changed, 53 insertions, 29 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 295b1e06de7..bfb7d1a261c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -39,13 +39,13 @@
* BufFile infrastructure supports temporary files that exceed the OS file size
* limit, (b) provides a way for automatic clean up on the error and (c) provides
* a way to survive these files across local transactions and allow to open and
- * close at stream start and close. We decided to use SharedFileSet
+ * close at stream start and close. We decided to use FileSet
* infrastructure as without that it deletes the files on the closure of the
* file and if we decide to keep stream files open across the start/stop stream
* then it will consume a lot of memory (more than 8K for each BufFile and
* there could be multiple such BufFiles as the subscriber could receive
* multiple start/stop streams for different transactions before getting the
- * commit). Moreover, if we don't use SharedFileSet then we also need to invent
+ * commit). Moreover, if we don't use FileSet then we also need to invent
* a new way to pass filenames to BufFile APIs so that we are allowed to open
* the file we desired across multiple stream-open calls for the same
* transaction.
@@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
typedef struct StreamXidHash
{
TransactionId xid; /* xid is the hash key and must be first */
- SharedFileSet *stream_fileset; /* shared file set for stream data */
- SharedFileSet *subxact_fileset; /* shared file set for subxact info */
+ FileSet *stream_fileset; /* file set for stream data */
+ FileSet *subxact_fileset; /* file set for subxact info */
} StreamXidHash;
static MemoryContext ApplyMessageContext = NULL;
@@ -270,8 +270,8 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
/*
- * Hash table for storing the streaming xid information along with shared file
- * set for streaming and subxact files.
+ * Hash table for storing the streaming xid information along with filesets
+ * for streaming and subxact files.
*/
static HTAB *xidhash = NULL;
@@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s)
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
- fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
/* OK, truncate the file at the right offset */
- BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
- subxact_data.subxacts[subidx].offset);
+ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
+ subxact_data.subxacts[subidx].offset);
BufFileClose(fd);
/* discard the subxacts added later */
@@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
errmsg_internal("transaction %u not found in stream XID hash table",
xid)));
- fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
@@ -2542,6 +2542,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
/*
+ * Cleanup filesets.
+ */
+void
+logicalrep_worker_cleanupfileset(void)
+{
+ HASH_SEQ_STATUS status;
+ StreamXidHash *hentry;
+
+ /* Remove all the pending stream and subxact filesets. */
+ if (xidhash)
+ {
+ hash_seq_init(&status, xidhash);
+ while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
+ {
+ FileSetDeleteAll(hentry->stream_fileset);
+
+ /* Delete the subxact fileset iff it is created. */
+ if (hentry->subxact_fileset)
+ FileSetDeleteAll(hentry->subxact_fileset);
+ }
+ }
+}
+
+/*
* Apply main loop.
*/
static void
@@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid)
if (ent->subxact_fileset)
{
cleanup_subxact_info();
- SharedFileSetDeleteAll(ent->subxact_fileset);
+ FileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
@@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid)
MemoryContext oldctx;
/*
- * We need to maintain shared fileset across multiple stream
- * start/stop calls. So, need to allocate it in a persistent context.
+ * We need to maintain fileset across multiple stream start/stop
+ * calls. So, need to allocate it in a persistent context.
*/
oldctx = MemoryContextSwitchTo(ApplyContext);
- ent->subxact_fileset = palloc(sizeof(SharedFileSet));
- SharedFileSetInit(ent->subxact_fileset, NULL);
+ ent->subxact_fileset = palloc(sizeof(FileSet));
+ FileSetInit(ent->subxact_fileset);
MemoryContextSwitchTo(oldctx);
- fd = BufFileCreateShared(ent->subxact_fileset, path);
+ fd = BufFileCreateFileSet(ent->subxact_fileset, path);
}
else
- fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
@@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid)
subxact_filename(path, subid, xid);
- fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
/* read number of subxact items */
if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3264,7 +3288,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
/* Delete the change file and release the stream fileset memory */
changes_filename(path, subid, xid);
- SharedFileSetDeleteAll(ent->stream_fileset);
+ FileSetDeleteAll(ent->stream_fileset);
pfree(ent->stream_fileset);
ent->stream_fileset = NULL;
@@ -3272,7 +3296,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
if (ent->subxact_fileset)
{
subxact_filename(path, subid, xid);
- SharedFileSetDeleteAll(ent->subxact_fileset);
+ FileSetDeleteAll(ent->subxact_fileset);
pfree(ent->subxact_fileset);
ent->subxact_fileset = NULL;
}
@@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the shared fileset and create the
- * buffile, otherwise open the previously created file.
+ * changes for this transaction, initialize the fileset and create the buffile,
+ * otherwise open the previously created file.
*
* This can only be called at the beginning of a "streaming" block, i.e.
* between stream_start/stream_stop messages from the upstream.
@@ -3330,7 +3354,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
if (first_segment)
{
MemoryContext savectx;
- SharedFileSet *fileset;
+ FileSet *fileset;
if (found)
ereport(ERROR,
@@ -3338,16 +3362,16 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
/*
- * We need to maintain shared fileset across multiple stream
- * start/stop calls. So, need to allocate it in a persistent context.
+ * We need to maintain fileset across multiple stream start/stop
+ * calls. So, need to allocate it in a persistent context.
*/
savectx = MemoryContextSwitchTo(ApplyContext);
- fileset = palloc(sizeof(SharedFileSet));
+ fileset = palloc(sizeof(FileSet));
- SharedFileSetInit(fileset, NULL);
+ FileSetInit(fileset);
MemoryContextSwitchTo(savectx);
- stream_fd = BufFileCreateShared(fileset, path);
+ stream_fd = BufFileCreateFileSet(fileset, path);
/* Remember the fileset for the next stream of the same transaction */
ent->xid = xid;
@@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
- stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+ stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
BufFileSeek(stream_fd, 0, 0, SEEK_END);
}