1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
# Testing of logical decoding using SQL interface and/or pg_recvlogical
#
# Most logical decoding tests are in contrib/test_decoding. This module
# is for work that doesn't fit well there, like where server restarts
# are required.
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 11;
use Config;
# Initialize master node
my $node_master = get_new_node('master');
$node_master->init(allows_streaming => 1);
$node_master->append_conf(
'postgresql.conf', qq(
wal_level = logical
));
$node_master->start;
my $backup_name = 'master_backup';
$node_master->safe_psql('postgres',
qq[CREATE TABLE decoding_test(x integer, y text);]);
$node_master->safe_psql('postgres',
qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
$node_master->safe_psql('postgres',
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
);
# Basic decoding works
my ($result) = $node_master->safe_psql('postgres',
qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
is(scalar(my @foobar = split /^/m, $result),
12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
# If we immediately crash the server we might lose the progress we just made
# and replay the same changes again. But a clean shutdown should never repeat
# the same changes when we use the SQL decoding interface.
$node_master->restart('fast');
# There are no new writes, so the result should be empty.
$result = $node_master->safe_psql('postgres',
qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
chomp($result);
is($result, '', 'Decoding after fast restart repeats no rows');
# Insert some rows and verify that we get the same results from pg_recvlogical
# and the SQL interface.
$node_master->safe_psql('postgres',
qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
);
my $expected = q{BEGIN
table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
COMMIT};
my $stdout_sql = $node_master->safe_psql('postgres',
qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
);
is($stdout_sql, $expected, 'got expected output from SQL decoding session');
my $endpos = $node_master->safe_psql('postgres',
"SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
);
print "waiting to replay $endpos\n";
my $stdout_recv = $node_master->pg_recvlogical_upto(
'postgres', 'test_slot', $endpos, 180,
'include-xids' => '0',
'skip-empty-xacts' => '1');
chomp($stdout_recv);
is($stdout_recv, $expected,
'got same expected output from pg_recvlogical decoding session');
$node_master->poll_query_until('postgres',
"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'test_slot' AND active_pid IS NULL)"
)
or die "slot never became inactive";
$stdout_recv = $node_master->pg_recvlogical_upto(
'postgres', 'test_slot', $endpos, 180,
'include-xids' => '0',
'skip-empty-xacts' => '1');
chomp($stdout_recv);
is($stdout_recv, '',
'pg_recvlogical acknowledged changes, nothing pending on slot');
$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
is( $node_master->psql(
'otherdb',
"SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
),
3,
'replaying logical slot from another database fails');
$node_master->safe_psql('otherdb',
qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]
);
# make sure you can't drop a slot while active
SKIP:
{
# some Windows Perls at least don't like IPC::Run's start/kill_kill regime.
skip "Test fails on Windows perl", 2 if $Config{osname} eq 'MSWin32';
my $pg_recvlogical = IPC::Run::start(
[ 'pg_recvlogical', '-d', $node_master->connstr('otherdb'),
'-S', 'otherdb_slot', '-f', '-', '--start' ]);
$node_master->poll_query_until('otherdb',
"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)"
) or die "slot never became active";
is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
3, 'dropping a DB with active logical slots fails');
$pg_recvlogical->kill_kill;
is($node_master->slot('otherdb_slot')->{'slot_name'},
undef, 'logical slot still exists');
}
$node_master->poll_query_until('otherdb',
"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)"
) or die "slot never became inactive";
is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
0, 'dropping a DB with inactive logical slots succeeds');
is($node_master->slot('otherdb_slot')->{'slot_name'},
undef, 'logical slot was actually dropped with DB');
# Test to ensure that we don't run out of file descriptors even if there
# are more spill files than maxAllocatedDescs.
# Set max_files_per_process to a small value to make it more likely to run out
# of max open file descriptors.
$node_master->safe_psql('postgres',
'ALTER SYSTEM SET max_files_per_process = 26;');
$node_master->restart;
$node_master->safe_psql(
'postgres', q{
do $$
BEGIN
FOR i IN 1..10 LOOP
BEGIN
INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
EXCEPTION
when division_by_zero then perform 'dummy';
END;
END LOOP;
END $$;
});
$result = $node_master->safe_psql('postgres',
qq[
SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
]);
$expected = q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
is($result, $expected, 'got expected output from spilling subxacts session');
# Reset back max_files_per_process
$node_master->safe_psql('postgres',
'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
$node_master->restart;
# done with the node
$node_master->stop;
|