diff options
Diffstat (limited to 'src/include/replication/worker_internal.h')
-rw-r--r-- | src/include/replication/worker_internal.h | 227 |
1 files changed, 221 insertions, 6 deletions
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 2a3ec5c2d8c..db891eea8ae 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -17,8 +17,13 @@ #include "access/xlogdefs.h" #include "catalog/pg_subscription.h" #include "datatype/timestamp.h" +#include "miscadmin.h" +#include "replication/logicalrelation.h" +#include "storage/buffile.h" #include "storage/fileset.h" #include "storage/lock.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" #include "storage/spin.h" @@ -53,13 +58,24 @@ typedef struct LogicalRepWorker /* * Used to create the changes and subxact files for the streaming - * transactions. Upon the arrival of the first streaming transaction, the - * fileset will be initialized, and it will be deleted when the worker - * exits. Under this, separate buffiles would be created for each - * transaction which will be deleted after the transaction is finished. + * transactions. Upon the arrival of the first streaming transaction or + * when the first-time leader apply worker times out while sending changes + * to the parallel apply worker, the fileset will be initialized, and it + * will be deleted when the worker exits. Under this, separate buffiles + * would be created for each transaction which will be deleted after the + * transaction is finished. */ FileSet *stream_fileset; + /* + * PID of leader apply worker if this slot is used for a parallel apply + * worker, InvalidPid otherwise. + */ + pid_t apply_leader_pid; + + /* Indicates whether apply can be performed in parallel. */ + bool parallel_apply; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -68,9 +84,138 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +/* + * State of the transaction in parallel apply worker. + * + * The enum values must have the same order as the transaction state + * transitions. + */ +typedef enum ParallelTransState +{ + PARALLEL_TRANS_UNKNOWN, + PARALLEL_TRANS_STARTED, + PARALLEL_TRANS_FINISHED +} ParallelTransState; + +/* + * State of fileset used to communicate changes from leader to parallel + * apply worker. + * + * FS_EMPTY indicates an initial state where the leader doesn't need to use + * the file to communicate with the parallel apply worker. + * + * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes + * to the file. + * + * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to + * the file. + * + * FS_READY indicates that it is now ok for a parallel apply worker to + * read the file. + */ +typedef enum PartialFileSetState +{ + FS_EMPTY, + FS_SERIALIZE_IN_PROGRESS, + FS_SERIALIZE_DONE, + FS_READY +} PartialFileSetState; + +/* + * Struct for sharing information between leader apply worker and parallel + * apply workers. + */ +typedef struct ParallelApplyWorkerShared +{ + slock_t mutex; + + TransactionId xid; + + /* + * State used to ensure commit ordering. + * + * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after + * handling the transaction finish commands while the apply leader will + * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in + * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/ + * STREAM_ABORT). + */ + ParallelTransState xact_state; + + /* Information from the corresponding LogicalRepWorker slot. */ + uint16 logicalrep_worker_generation; + int logicalrep_worker_slot_no; + + /* + * Indicates whether there are pending streaming blocks in the queue. The + * parallel apply worker will check it before starting to wait. + */ + pg_atomic_uint32 pending_stream_count; + + /* + * XactLastCommitEnd from the parallel apply worker. This is required by + * the leader worker so it can update the lsn_mappings. + */ + XLogRecPtr last_commit_end; + + /* + * After entering PARTIAL_SERIALIZE mode, the leader apply worker will + * serialize changes to the file, and share the fileset with the parallel + * apply worker when processing the transaction finish command. Then the + * parallel apply worker will apply all the spooled messages. + * + * FileSet is used here instead of SharedFileSet because we need it to + * survive after releasing the shared memory so that the leader apply + * worker can re-use the same fileset for the next streaming transaction. + */ + PartialFileSetState fileset_state; + FileSet fileset; +} ParallelApplyWorkerShared; + +/* + * Information which is used to manage the parallel apply worker. + */ +typedef struct ParallelApplyWorkerInfo +{ + /* + * This queue is used to send changes from the leader apply worker to the + * parallel apply worker. + */ + shm_mq_handle *mq_handle; + + /* + * This queue is used to transfer error messages from the parallel apply + * worker to the leader apply worker. + */ + shm_mq_handle *error_mq_handle; + + dsm_segment *dsm_seg; + + /* + * Indicates whether the leader apply worker needs to serialize the + * remaining changes to a file due to timeout when attempting to send data + * to the parallel apply worker via shared memory. + */ + bool serialize_changes; + + /* + * True if the worker is being used to process a parallel apply + * transaction. False indicates this worker is available for re-use. + */ + bool in_use; + + ParallelApplyWorkerShared *shared; +} ParallelApplyWorkerInfo; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern PGDLLIMPORT MemoryContext ApplyContext; +extern PGDLLIMPORT MemoryContext ApplyMessageContext; + +extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack; + +extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared; + /* libpqreceiver connection */ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; @@ -84,9 +229,11 @@ extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); -extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, - Oid userid, Oid relid); +extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, + Oid userid, Oid relid, + dsm_handle subworker_dsm); extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); @@ -103,10 +250,78 @@ extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue); +extern void stream_start_internal(TransactionId xid, bool first_segment); +extern void stream_stop_internal(TransactionId xid); + +/* Common streaming function to apply all the spooled messages */ +extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, + XLogRecPtr lsn); + +extern void apply_dispatch(StringInfo s); + +extern void maybe_reread_subscription(void); + +extern void stream_cleanup_files(Oid subid, TransactionId xid); + +extern void InitializeApplyWorker(void); + +extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn); + +/* Function for apply error callback */ +extern void apply_error_callback(void *arg); +extern void set_apply_error_context_origin(char *originname); + +/* Parallel apply worker setup and interactions */ +extern void pa_allocate_worker(TransactionId xid); +extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); +extern void pa_detach_all_error_mq(void); + +extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, + const void *data); +extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, + bool stream_locked); + +extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared, + ParallelTransState in_xact); +extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo); + +extern void pa_start_subtrans(TransactionId current_xid, + TransactionId top_xid); +extern void pa_reset_subtrans(void); +extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data); +extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared, + PartialFileSetState fileset_state); + +extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode); +extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode); + +extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode); +extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode); + +extern void pa_decr_and_wait_stream_block(void); + +extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, + XLogRecPtr remote_lsn); + +#define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid) + static inline bool am_tablesync_worker(void) { return OidIsValid(MyLogicalRepWorker->relid); } +static inline bool +am_leader_apply_worker(void) +{ + return (!am_tablesync_worker() && + !isParallelApplyWorker(MyLogicalRepWorker)); +} + +static inline bool +am_parallel_apply_worker(void) +{ + return isParallelApplyWorker(MyLogicalRepWorker); +} + #endif /* WORKER_INTERNAL_H */ |