aboutsummaryrefslogtreecommitdiff
path: root/src/test/subscription/t/026_stats.pl
blob: a42ea3170ecd3382841fbc0027b5d685899a5d27 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# Copyright (c) 2021-2022, PostgreSQL Global Development Group

# Tests for subscription stats.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Create publisher node.
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# Create subscriber node.
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;

# Initial table setup on both publisher and subscriber. On subscriber we
# create the same tables but with primary keys. Also, insert some data that
# will conflict with the data replicated from publisher later.
$node_publisher->safe_psql(
	'postgres',
	qq[
BEGIN;
CREATE TABLE test_tab1 (a int);
INSERT INTO test_tab1 VALUES (1);
COMMIT;
]);
$node_subscriber->safe_psql(
	'postgres',
	qq[
BEGIN;
CREATE TABLE test_tab1 (a int primary key);
INSERT INTO test_tab1 VALUES (1);
COMMIT;
]);

# Setup publication.
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
	"CREATE PUBLICATION tap_pub FOR TABLE test_tab1;");

# There shouldn't be any subscription errors before starting logical replication.
my $result = $node_subscriber->safe_psql('postgres',
	"SELECT count(1) FROM pg_stat_subscription_stats");
is($result, qq(0), 'check no subscription error');

# Create subscription. The tablesync for test_tab1 on tap_sub will enter into
# infinite error loop due to violating the unique constraint.
$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub;"
);

$node_publisher->wait_for_catchup('tap_sub');

# Wait for the tablesync error to be reported.
$node_subscriber->poll_query_until(
	'postgres',
	qq[
SELECT sync_error_count > 0
FROM pg_stat_subscription_stats
WHERE subname = 'tap_sub'
]) or die "Timed out while waiting for tablesync error";

# Truncate test_tab1 so that tablesync worker can continue.
$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;");

# Wait for initial tablesync for test_tab1 to finish.
$node_subscriber->poll_query_until(
	'postgres',
	qq[
SELECT count(1) = 1 FROM pg_subscription_rel
WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's')
]) or die "Timed out while waiting for subscriber to synchronize data";

# Check test_tab1 on the subscriber has one row.
$result = $node_subscriber->safe_psql('postgres', "SELECT a FROM test_tab1");
is($result, qq(1), 'check the table has now row');

# Insert data to test_tab1 on the publisher, raising an error on the subscriber
# due to violation of the unique constraint on test_tab1.
$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1 VALUES (1)");

# Wait for the apply error to be reported.
$node_subscriber->poll_query_until(
	'postgres',
	qq[
SELECT apply_error_count > 0
FROM pg_stat_subscription_stats
WHERE subname = 'tap_sub'
]) or die "Timed out while waiting for apply error";

# Truncate test_tab1 so that apply worker can continue.
$node_subscriber->safe_psql('postgres', "TRUNCATE test_tab1;");

$node_subscriber->stop('fast');
$node_publisher->stop('fast');

done_testing();