diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 82 |
1 files changed, 53 insertions, 29 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 295b1e06de7..bfb7d1a261c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -39,13 +39,13 @@ * BufFile infrastructure supports temporary files that exceed the OS file size * limit, (b) provides a way for automatic clean up on the error and (c) provides * a way to survive these files across local transactions and allow to open and - * close at stream start and close. We decided to use SharedFileSet + * close at stream start and close. We decided to use FileSet * infrastructure as without that it deletes the files on the closure of the * file and if we decide to keep stream files open across the start/stop stream * then it will consume a lot of memory (more than 8K for each BufFile and * there could be multiple such BufFiles as the subscriber could receive * multiple start/stop streams for different transactions before getting the - * commit). Moreover, if we don't use SharedFileSet then we also need to invent + * commit). Moreover, if we don't use FileSet then we also need to invent * a new way to pass filenames to BufFile APIs so that we are allowed to open * the file we desired across multiple stream-open calls for the same * transaction. @@ -246,8 +246,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg = typedef struct StreamXidHash { TransactionId xid; /* xid is the hash key and must be first */ - SharedFileSet *stream_fileset; /* shared file set for stream data */ - SharedFileSet *subxact_fileset; /* shared file set for subxact info */ + FileSet *stream_fileset; /* file set for stream data */ + FileSet *subxact_fileset; /* file set for subxact info */ } StreamXidHash; static MemoryContext ApplyMessageContext = NULL; @@ -270,8 +270,8 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; /* - * Hash table for storing the streaming xid information along with shared file - * set for streaming and subxact files. + * Hash table for storing the streaming xid information along with filesets + * for streaming and subxact files. */ static HTAB *xidhash = NULL; @@ -1297,11 +1297,11 @@ apply_handle_stream_abort(StringInfo s) /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); /* OK, truncate the file at the right offset */ - BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno, - subxact_data.subxacts[subidx].offset); + BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, + subxact_data.subxacts[subidx].offset); BufFileClose(fd); /* discard the subxacts added later */ @@ -1355,7 +1355,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) errmsg_internal("transaction %u not found in stream XID hash table", xid))); - fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2542,6 +2542,30 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } /* + * Cleanup filesets. + */ +void +logicalrep_worker_cleanupfileset(void) +{ + HASH_SEQ_STATUS status; + StreamXidHash *hentry; + + /* Remove all the pending stream and subxact filesets. */ + if (xidhash) + { + hash_seq_init(&status, xidhash); + while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) + { + FileSetDeleteAll(hentry->stream_fileset); + + /* Delete the subxact fileset iff it is created. */ + if (hentry->subxact_fileset) + FileSetDeleteAll(hentry->subxact_fileset); + } + } +} + +/* * Apply main loop. */ static void @@ -3024,7 +3048,7 @@ subxact_info_write(Oid subid, TransactionId xid) if (ent->subxact_fileset) { cleanup_subxact_info(); - SharedFileSetDeleteAll(ent->subxact_fileset); + FileSetDeleteAll(ent->subxact_fileset); pfree(ent->subxact_fileset); ent->subxact_fileset = NULL; } @@ -3042,18 +3066,18 @@ subxact_info_write(Oid subid, TransactionId xid) MemoryContext oldctx; /* - * We need to maintain shared fileset across multiple stream - * start/stop calls. So, need to allocate it in a persistent context. + * We need to maintain fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. */ oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(SharedFileSet)); - SharedFileSetInit(ent->subxact_fileset, NULL); + ent->subxact_fileset = palloc(sizeof(FileSet)); + FileSetInit(ent->subxact_fileset); MemoryContextSwitchTo(oldctx); - fd = BufFileCreateShared(ent->subxact_fileset, path); + fd = BufFileCreateFileSet(ent->subxact_fileset, path); } else - fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3107,7 +3131,7 @@ subxact_info_read(Oid subid, TransactionId xid) subxact_filename(path, subid, xid); - fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3264,7 +3288,7 @@ stream_cleanup_files(Oid subid, TransactionId xid) /* Delete the change file and release the stream fileset memory */ changes_filename(path, subid, xid); - SharedFileSetDeleteAll(ent->stream_fileset); + FileSetDeleteAll(ent->stream_fileset); pfree(ent->stream_fileset); ent->stream_fileset = NULL; @@ -3272,7 +3296,7 @@ stream_cleanup_files(Oid subid, TransactionId xid) if (ent->subxact_fileset) { subxact_filename(path, subid, xid); - SharedFileSetDeleteAll(ent->subxact_fileset); + FileSetDeleteAll(ent->subxact_fileset); pfree(ent->subxact_fileset); ent->subxact_fileset = NULL; } @@ -3288,8 +3312,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the shared fileset and create the - * buffile, otherwise open the previously created file. + * changes for this transaction, initialize the fileset and create the buffile, + * otherwise open the previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3330,7 +3354,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) if (first_segment) { MemoryContext savectx; - SharedFileSet *fileset; + FileSet *fileset; if (found) ereport(ERROR, @@ -3338,16 +3362,16 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); /* - * We need to maintain shared fileset across multiple stream - * start/stop calls. So, need to allocate it in a persistent context. + * We need to maintain fileset across multiple stream start/stop + * calls. So, need to allocate it in a persistent context. */ savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(SharedFileSet)); + fileset = palloc(sizeof(FileSet)); - SharedFileSetInit(fileset, NULL); + FileSetInit(fileset); MemoryContextSwitchTo(savectx); - stream_fd = BufFileCreateShared(fileset, path); + stream_fd = BufFileCreateFileSet(fileset, path); /* Remember the fileset for the next stream of the same transaction */ ent->xid = xid; @@ -3365,7 +3389,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); BufFileSeek(stream_fd, 0, 0, SEEK_END); } |