aboutsummaryrefslogtreecommitdiff
path: root/src/test/subscription/t/015_stream.pl
blob: 03135b1cd6e1a3a76952e85890fa2ded9c2bf850 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# Copyright (c) 2021-2025, PostgreSQL Global Development Group

# Test streaming of simple large transaction
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
	my ($node_subscriber, $offset, $is_parallel, $type) = @_;

	if ($is_parallel)
	{
		$node_subscriber->wait_for_log(
			qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
			$offset);
	}
}

# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
	my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;

	# Interleave a pair of transactions, each exceeding the 64kB limit.
	my $offset = 0;

	my $h = $node_publisher->background_psql('postgres', on_error_stop => 0);

	# Check the subscriber log from now on.
	$offset = -s $node_subscriber->logfile;

	$h->query_safe(
		q{
	BEGIN;
	INSERT INTO test_tab SELECT i, sha256(i::text::bytea) FROM generate_series(3, 5000) s(i);
	UPDATE test_tab SET b = sha256(b) WHERE mod(a,2) = 0;
	DELETE FROM test_tab WHERE mod(a,3) = 0;
	});

	$node_publisher->safe_psql(
		'postgres', q{
	BEGIN;
	INSERT INTO test_tab SELECT i, sha256(i::text::bytea) FROM generate_series(5001, 9999) s(i);
	DELETE FROM test_tab WHERE a > 5000;
	COMMIT;
	});

	$h->query_safe('COMMIT');
	# errors make the next test fail, so ignore them here
	$h->quit;

	$node_publisher->wait_for_catchup($appname);

	check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');

	my $result =
	  $node_subscriber->safe_psql('postgres',
		"SELECT count(*), count(c), count(d = 999) FROM test_tab");
	is($result, qq(3334|3334|3334),
		'check extra columns contain local defaults');

	# Test the streaming in binary mode
	$node_subscriber->safe_psql('postgres',
		"ALTER SUBSCRIPTION tap_sub SET (binary = on)");

	# Check the subscriber log from now on.
	$offset = -s $node_subscriber->logfile;

	# Insert, update and delete enough rows to exceed the 64kB limit.
	$node_publisher->safe_psql(
		'postgres', q{
	BEGIN;
	INSERT INTO test_tab SELECT i, sha256(i::text::bytea) FROM generate_series(5001, 10000) s(i);
	UPDATE test_tab SET b = sha256(b) WHERE mod(a,2) = 0;
	DELETE FROM test_tab WHERE mod(a,3) = 0;
	COMMIT;
	});

	$node_publisher->wait_for_catchup($appname);

	check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');

	$result =
	  $node_subscriber->safe_psql('postgres',
		"SELECT count(*), count(c), count(d = 999) FROM test_tab");
	is($result, qq(6667|6667|6667),
		'check extra columns contain local defaults');

	# Change the local values of the extra columns on the subscriber,
	# update publisher, and check that subscriber retains the expected
	# values. This is to ensure that non-streaming transactions behave
	# properly after a streaming transaction.
	$node_subscriber->safe_psql('postgres',
		"UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"
	);

	# Check the subscriber log from now on.
	$offset = -s $node_subscriber->logfile;

	$node_publisher->safe_psql('postgres',
		"UPDATE test_tab SET b = sha256(a::text::bytea)");

	$node_publisher->wait_for_catchup($appname);

	check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');

	$result = $node_subscriber->safe_psql('postgres',
		"SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"
	);
	is($result, qq(6667|6667|6667),
		'check extra columns contain locally changed data');

	# Cleanup the test data
	$node_publisher->safe_psql('postgres',
		"DELETE FROM test_tab WHERE (a > 2)");
	$node_publisher->wait_for_catchup($appname);
}

# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
	'logical_decoding_work_mem = 64kB');
$node_publisher->start;

# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->start;

# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
	"CREATE TABLE test_tab (a int primary key, b bytea)");
$node_publisher->safe_psql('postgres',
	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");

$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");

# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
	"CREATE TABLE test_tab (a int primary key, b bytea, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);

$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
$node_subscriber->safe_psql('postgres',
	"CREATE UNIQUE INDEX idx_tab on test_tab_2(a)");

# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
	"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");

my $appname = 'tap_sub';

################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);

# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);

my $result =
  $node_subscriber->safe_psql('postgres',
	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');

test_streaming($node_publisher, $node_subscriber, $appname, 0);

######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
	"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);

$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off)");

$node_publisher->poll_query_until('postgres',
	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
  )
  or die
  "Timed out while waiting for apply to restart after changing SUBSCRIPTION";

# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;

# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});

test_streaming($node_publisher, $node_subscriber, $appname, 1);

# Test that the deadlock is detected among the leader and parallel apply
# workers.

$node_subscriber->append_conf('postgresql.conf', "deadlock_timeout = 10ms");
$node_subscriber->reload;

# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});

# Interleave a pair of transactions, each exceeding the 64kB limit.
my $h = $node_publisher->background_psql('postgres', on_error_stop => 0);

# Confirm if a deadlock between the leader apply worker and the parallel apply
# worker can be detected.

my $offset = -s $node_subscriber->logfile;

$h->query_safe(
	q{
BEGIN;
INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
});

# Ensure that the parallel apply worker executes the insert command before the
# leader worker.
$node_subscriber->wait_for_log(
	qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/,
	$offset);

$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");

$h->query_safe('COMMIT');
$h->quit;

$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
	$offset);

# In order for the two transactions to be completed normally without causing
# conflicts due to the unique index, we temporarily drop it.
$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab");

# Wait for this streaming transaction to be applied in the apply worker.
$node_publisher->wait_for_catchup($appname);

$result =
  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(5001), 'data replicated to subscriber after dropping index');

# Clean up test data from the environment.
$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
$node_publisher->wait_for_catchup($appname);
$node_subscriber->safe_psql('postgres',
	"CREATE UNIQUE INDEX idx_tab on test_tab_2(a)");

# Confirm if a deadlock between two parallel apply workers can be detected.

# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;

$h->query_safe(
	q{
BEGIN;
INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
});

# Ensure that the first parallel apply worker executes the insert command
# before the second one.
$node_subscriber->wait_for_log(
	qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/,
	$offset);

$node_publisher->safe_psql('postgres',
	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");

$h->query_safe('COMMIT');
$h->quit;

$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
	$offset);

# In order for the two transactions to be completed normally without causing
# conflicts due to the unique index, we temporarily drop it.
$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab");

# Wait for this streaming transaction to be applied in the apply worker.
$node_publisher->wait_for_catchup($appname);

$result =
  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(10000), 'data replicated to subscriber after dropping index');

# Test serializing changes to files and notify the parallel apply worker to
# apply them at the end of the transaction.
$node_subscriber->append_conf('postgresql.conf',
	'debug_logical_replication_streaming = immediate');
# Reset the log_min_messages to default.
$node_subscriber->append_conf('postgresql.conf',
	"log_min_messages = warning");
$node_subscriber->reload;

# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});

$offset = -s $node_subscriber->logfile;

$node_publisher->safe_psql('postgres',
	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");

# Ensure that the changes are serialized.
$node_subscriber->wait_for_log(
	qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
	$offset);

$node_publisher->wait_for_catchup($appname);

# Check that transaction is committed on subscriber
$result =
  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(15000),
	'parallel apply worker replayed all changes from file');

$node_subscriber->stop;
$node_publisher->stop;

done_testing();