aboutsummaryrefslogtreecommitdiff
path: root/src/include/replication/output_plugin.h
blob: b78c796450a18330eda89a1bde95396670ebb9b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/*-------------------------------------------------------------------------
 * output_plugin.h
 *	   PostgreSQL Logical Decode Plugin Interface
 *
 * Copyright (c) 2012-2020, PostgreSQL Global Development Group
 *
 *-------------------------------------------------------------------------
 */
#ifndef OUTPUT_PLUGIN_H
#define OUTPUT_PLUGIN_H

#include "replication/reorderbuffer.h"

struct LogicalDecodingContext;
struct OutputPluginCallbacks;

typedef enum OutputPluginOutputType
{
	OUTPUT_PLUGIN_BINARY_OUTPUT,
	OUTPUT_PLUGIN_TEXTUAL_OUTPUT
} OutputPluginOutputType;

/*
 * Options set by the output plugin, in the startup callback.
 */
typedef struct OutputPluginOptions
{
	OutputPluginOutputType output_type;
	bool		receive_rewrites;
} OutputPluginOptions;

/*
 * Type of the shared library symbol _PG_output_plugin_init that is looked up
 * when loading an output plugin shared library.
 */
typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

/*
 * Callback that gets called in a user-defined plugin. ctx->private_data can
 * be set to some private data.
 *
 * "is_init" will be set to "true" if the decoding slot just got defined. When
 * the same slot is used from there one, it will be "false".
 */
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
										OutputPluginOptions *options,
										bool is_init);

/*
 * Callback called for every (explicit or implicit) BEGIN of a successful
 * transaction.
 */
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
									  ReorderBufferTXN *txn);

/*
 * Callback for every individual change in a successful transaction.
 */
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
									   ReorderBufferTXN *txn,
									   Relation relation,
									   ReorderBufferChange *change);

/*
 * Callback for every TRUNCATE in a successful transaction.
 */
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
										 ReorderBufferTXN *txn,
										 int nrelations,
										 Relation relations[],
										 ReorderBufferChange *change);

/*
 * Called for every (explicit or implicit) COMMIT of a successful transaction.
 */
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
									   ReorderBufferTXN *txn,
									   XLogRecPtr commit_lsn);

/*
 * Called for the generic logical decoding messages.
 */
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
										ReorderBufferTXN *txn,
										XLogRecPtr message_lsn,
										bool transactional,
										const char *prefix,
										Size message_size,
										const char *message);

/*
 * Filter changes by origin.
 */
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
											   RepOriginId origin_id);

/*
 * Called to shutdown an output plugin.
 */
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

/*
 * Called when starting to stream a block of changes from in-progress
 * transaction (may be called repeatedly, if it's streamed in multiple
 * chunks).
 */
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx,
											ReorderBufferTXN *txn);

/*
 * Called when stopping to stream a block of changes from in-progress
 * transaction to a remote node (may be called repeatedly, if it's streamed
 * in multiple chunks).
 */
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
										   ReorderBufferTXN *txn);

/*
 * Called to discard changes streamed to remote node from in-progress
 * transaction.
 */
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
											ReorderBufferTXN *txn,
											XLogRecPtr abort_lsn);

/*
 * Called to apply changes streamed to remote node from in-progress
 * transaction.
 */
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
											 ReorderBufferTXN *txn,
											 XLogRecPtr commit_lsn);

/*
 * Callback for streaming individual changes from in-progress transactions.
 */
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
											 ReorderBufferTXN *txn,
											 Relation relation,
											 ReorderBufferChange *change);

/*
 * Callback for streaming generic logical decoding messages from in-progress
 * transactions.
 */
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx,
											  ReorderBufferTXN *txn,
											  XLogRecPtr message_lsn,
											  bool transactional,
											  const char *prefix,
											  Size message_size,
											  const char *message);

/*
 * Callback for streaming truncates from in-progress transactions.
 */
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
											   ReorderBufferTXN *txn,
											   int nrelations,
											   Relation relations[],
											   ReorderBufferChange *change);

/*
 * Output plugin callbacks
 */
typedef struct OutputPluginCallbacks
{
	LogicalDecodeStartupCB startup_cb;
	LogicalDecodeBeginCB begin_cb;
	LogicalDecodeChangeCB change_cb;
	LogicalDecodeTruncateCB truncate_cb;
	LogicalDecodeCommitCB commit_cb;
	LogicalDecodeMessageCB message_cb;
	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
	LogicalDecodeShutdownCB shutdown_cb;
	/* streaming of changes */
	LogicalDecodeStreamStartCB stream_start_cb;
	LogicalDecodeStreamStopCB stream_stop_cb;
	LogicalDecodeStreamAbortCB stream_abort_cb;
	LogicalDecodeStreamCommitCB stream_commit_cb;
	LogicalDecodeStreamChangeCB stream_change_cb;
	LogicalDecodeStreamMessageCB stream_message_cb;
	LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;

/* Functions in replication/logical/logical.c */
extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);

#endif							/* OUTPUT_PLUGIN_H */