aboutsummaryrefslogtreecommitdiff
path: root/src/test/subscription/t/026_worker_stats.pl
blob: e64e0a74b87fa3ee090738ccce1058b0517caf48 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Copyright (c) 2021, PostgreSQL Global Development Group

# Tests for subscription error stats.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More tests => 5;

# Test if the error reported on pg_stat_subscription_workers view is expected.
sub test_subscription_error
{
    my ($node, $relname, $xid, $expected_error, $msg) = @_;

    my $check_sql = qq[
SELECT count(1) > 0 FROM pg_stat_subscription_workers
WHERE last_error_relid = '$relname'::regclass];
    $check_sql .= " AND last_error_xid = '$xid'::xid;" if $xid ne '';

    # Wait for the error statistics to be updated.
    $node->poll_query_until(
	'postgres', $check_sql,
) or die "Timed out while waiting for statistics to be updated";

    my $result = $node->safe_psql(
	'postgres',
	qq[
SELECT subname, last_error_command, last_error_relid::regclass, last_error_count > 0
FROM pg_stat_subscription_workers
WHERE last_error_relid = '$relname'::regclass;
]);
    is($result, $expected_error, $msg);
}

# Create publisher node.
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# Create subscriber node.
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');

# The subscriber will enter an infinite error loop, so we don't want
# to overflow the server log with error messages.
$node_subscriber->append_conf('postgresql.conf',
			      qq[
wal_retrieve_retry_interval = 2s
]);
$node_subscriber->start;

# Initial table setup on both publisher and subscriber. On subscriber we
# create the same tables but with primary keys. Also, insert some data that
# will conflict with the data replicated from publisher later.
$node_publisher->safe_psql(
    'postgres',
    qq[
BEGIN;
CREATE TABLE test_tab1 (a int);
CREATE TABLE test_tab2 (a int);
INSERT INTO test_tab1 VALUES (1);
INSERT INTO test_tab2 VALUES (1);
COMMIT;
]);
$node_subscriber->safe_psql(
    'postgres',
    qq[
BEGIN;
CREATE TABLE test_tab1 (a int primary key);
CREATE TABLE test_tab2 (a int primary key);
INSERT INTO test_tab2 VALUES (1);
COMMIT;
]);

# Setup publications.
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql(
    'postgres',
    "CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2;");

# There shouldn't be any subscription errors before starting logical replication.
my $result = $node_subscriber->safe_psql(
    'postgres',
    "SELECT count(1) FROM pg_stat_subscription_workers");
is($result, qq(0), 'check no subscription error');

# Create subscription. The table sync for test_tab2 on tap_sub will enter into
# infinite error loop due to violating the unique constraint.
$node_subscriber->safe_psql(
    'postgres',
    "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (streaming = off);");

$node_publisher->wait_for_catchup('tap_sub');

# Wait for initial table sync for test_tab1 to finish.
$node_subscriber->poll_query_until(
    'postgres',
    qq[
SELECT count(1) = 1 FROM pg_subscription_rel
WHERE srrelid = 'test_tab1'::regclass AND srsubstate in ('r', 's')
]) or die "Timed out while waiting for subscriber to synchronize data";

# Check the initial data.
$result = $node_subscriber->safe_psql(
    'postgres',
    "SELECT count(a) FROM test_tab1");
is($result, q(1), 'check initial data are copied to subscriber');

# Insert more data to test_tab1, raising an error on the subscriber due to
# violation of the unique constraint on test_tab1.
my $xid = $node_publisher->safe_psql(
    'postgres',
    qq[
BEGIN;
INSERT INTO test_tab1 VALUES (1);
SELECT pg_current_xact_id()::xid;
COMMIT;
]);
test_subscription_error($node_subscriber, 'test_tab1', $xid,
			qq(tap_sub|INSERT|test_tab1|t),
			'check the error reported by the apply worker');

# Check the table sync worker's error in the view.
test_subscription_error($node_subscriber, 'test_tab2', '',
			qq(tap_sub||test_tab2|t),
			'check the error reported by the table sync worker');

# Test for resetting subscription worker statistics.
# Truncate test_tab1 and test_tab2 so that applying changes and table sync can
# continue, respectively.
$node_subscriber->safe_psql(
    'postgres',
    "TRUNCATE test_tab1, test_tab2;");

# Wait for the data to be replicated.
$node_subscriber->poll_query_until(
    'postgres',
    "SELECT count(1) > 0 FROM test_tab1");
$node_subscriber->poll_query_until(
    'postgres',
    "SELECT count(1) > 0 FROM test_tab2");

# There shouldn't be any errors in the view after dropping the subscription.
$node_subscriber->safe_psql(
    'postgres',
    "DROP SUBSCRIPTION tap_sub;");
$result = $node_subscriber->safe_psql(
    'postgres',
    "SELECT count(1) FROM pg_stat_subscription_workers");
is($result, q(0), 'no error after dropping subscription');

$node_subscriber->stop('fast');
$node_publisher->stop('fast');