1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
# INSERT ... ON CONFLICT test verifying that speculative insertion
# failures are handled
#
# Does this by using advisory locks controlling progress of
# insertions. By waiting when building the index keys, it's possible
# to schedule concurrent INSERT ON CONFLICTs so that there will always
# be a speculative conflict.
setup
{
CREATE OR REPLACE FUNCTION blurt_and_lock_123(text) RETURNS text IMMUTABLE LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE 'blurt_and_lock_123() called for % in session %', $1, current_setting('spec.session')::int;
-- depending on lock state, wait for lock 2 or 3
IF pg_try_advisory_xact_lock(current_setting('spec.session')::int, 1) THEN
RAISE NOTICE 'acquiring advisory lock on 2';
PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 2);
ELSE
RAISE NOTICE 'acquiring advisory lock on 3';
PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 3);
END IF;
RETURN $1;
END;$$;
CREATE OR REPLACE FUNCTION blurt_and_lock_4(text) RETURNS text IMMUTABLE LANGUAGE plpgsql AS $$
BEGIN
RAISE NOTICE 'blurt_and_lock_4() called for % in session %', $1, current_setting('spec.session')::int;
RAISE NOTICE 'acquiring advisory lock on 4';
PERFORM pg_advisory_xact_lock(current_setting('spec.session')::int, 4);
RETURN $1;
END;$$;
CREATE OR REPLACE FUNCTION ctoast_large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 256) g';
CREATE TABLE upserttest(key text, data text);
CREATE UNIQUE INDEX upserttest_key_uniq_idx ON upserttest((blurt_and_lock_123(key)));
}
teardown
{
DROP TABLE upserttest;
}
session "controller"
setup
{
SET default_transaction_isolation = 'read committed';
SET application_name = 'isolation/insert-conflict-specconflict-controller';
}
step "controller_locks" {SELECT pg_advisory_lock(sess, lock), sess, lock FROM generate_series(1, 2) a(sess), generate_series(1,3) b(lock);}
step "controller_unlock_1_1" { SELECT pg_advisory_unlock(1, 1); }
step "controller_unlock_2_1" { SELECT pg_advisory_unlock(2, 1); }
step "controller_unlock_1_2" { SELECT pg_advisory_unlock(1, 2); }
step "controller_unlock_2_2" { SELECT pg_advisory_unlock(2, 2); }
step "controller_unlock_1_3" { SELECT pg_advisory_unlock(1, 3); }
step "controller_unlock_2_3" { SELECT pg_advisory_unlock(2, 3); }
step "controller_lock_2_4" { SELECT pg_advisory_lock(2, 4); }
step "controller_unlock_2_4" { SELECT pg_advisory_unlock(2, 4); }
step "controller_show" {SELECT * FROM upserttest; }
step "controller_show_count" {SELECT COUNT(*) FROM upserttest; }
step "controller_print_speculative_locks" {
SELECT pa.application_name, locktype, mode, granted
FROM pg_locks pl JOIN pg_stat_activity pa USING (pid)
WHERE
locktype IN ('spectoken', 'transactionid')
AND pa.datname = current_database()
AND pa.application_name LIKE 'isolation/insert-conflict-specconflict-s%'
ORDER BY 1, 2, 3, 4;
}
session "s1"
setup
{
SET default_transaction_isolation = 'read committed';
SET spec.session = 1;
SET application_name = 'isolation/insert-conflict-specconflict-s1';
}
step "s1_begin" { BEGIN; }
step "s1_create_non_unique_index" { CREATE INDEX upserttest_key_idx ON upserttest((blurt_and_lock_4(key))); }
step "s1_confirm_index_order" { SELECT 'upserttest_key_uniq_idx'::regclass::int8 < 'upserttest_key_idx'::regclass::int8; }
step "s1_upsert" { INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s1') ON CONFLICT (blurt_and_lock_123(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s1'; }
step "s1_insert_toast" { INSERT INTO upserttest VALUES('k2', ctoast_large_val()) ON CONFLICT DO NOTHING; }
step "s1_commit" { COMMIT; }
session "s2"
setup
{
SET default_transaction_isolation = 'read committed';
SET spec.session = 2;
SET application_name = 'isolation/insert-conflict-specconflict-s2';
}
step "s2_begin" { BEGIN; }
step "s2_upsert" { INSERT INTO upserttest(key, data) VALUES('k1', 'inserted s2') ON CONFLICT (blurt_and_lock_123(key)) DO UPDATE SET data = upserttest.data || ' with conflict update s2'; }
step "s2_insert_toast" { INSERT INTO upserttest VALUES('k2', ctoast_large_val()) ON CONFLICT DO NOTHING; }
step "s2_commit" { COMMIT; }
# Test that speculative locks are correctly acquired and released, s2
# inserts, s1 updates.
permutation
# acquire a number of locks, to control execution flow - the
# blurt_and_lock_123 function acquires advisory locks that allow us to
# continue after a) the optimistic conflict probe b) after the
# insertion of the speculative tuple.
"controller_locks"
"controller_show"
"s1_upsert" "s2_upsert"
"controller_show"
# Switch both sessions to wait on the other lock next time (the speculative insertion)
"controller_unlock_1_1" "controller_unlock_2_1"
# Allow both sessions to continue
"controller_unlock_1_3" "controller_unlock_2_3"
"controller_show"
# Allow the second session to finish insertion
"controller_unlock_2_2"
# This should now show a successful insertion
"controller_show"
# Allow the first session to finish insertion
"controller_unlock_1_2"
# This should now show a successful UPSERT
"controller_show"
# Test that speculative locks are correctly acquired and released, s1
# inserts, s2 updates.
permutation
# acquire a number of locks, to control execution flow - the
# blurt_and_lock_123 function acquires advisory locks that allow us to
# continue after a) the optimistic conflict probe b) after the
# insertion of the speculative tuple.
"controller_locks"
"controller_show"
"s1_upsert" "s2_upsert"
"controller_show"
# Switch both sessions to wait on the other lock next time (the speculative insertion)
"controller_unlock_1_1" "controller_unlock_2_1"
# Allow both sessions to continue
"controller_unlock_1_3" "controller_unlock_2_3"
"controller_show"
# Allow the first session to finish insertion
"controller_unlock_1_2"
# This should now show a successful insertion
"controller_show"
# Allow the second session to finish insertion
"controller_unlock_2_2"
# This should now show a successful UPSERT
"controller_show"
# Test that speculatively inserted toast rows do not cause conflicts.
# s1 inserts successfully, s2 does not.
permutation
# acquire a number of locks, to control execution flow - the
# blurt_and_lock_123 function acquires advisory locks that allow us to
# continue after a) the optimistic conflict probe b) after the
# insertion of the speculative tuple.
"controller_locks"
"controller_show"
"s1_insert_toast" "s2_insert_toast"
"controller_show"
# Switch both sessions to wait on the other lock next time (the speculative insertion)
"controller_unlock_1_1" "controller_unlock_2_1"
# Allow both sessions to continue
"controller_unlock_1_3" "controller_unlock_2_3"
"controller_show"
# Allow the first session to finish insertion
"controller_unlock_1_2"
# This should now show that 1 additional tuple was inserted successfully
"controller_show_count"
# Allow the second session to finish insertion and kill the speculatively inserted tuple
"controller_unlock_2_2"
# This should show the same number of tuples as before s2 inserted
"controller_show_count"
# Test that speculative locks are correctly acquired and released, s2
# inserts, s1 updates. With the added complication that transactions
# don't immediately commit.
permutation
# acquire a number of locks, to control execution flow - the
# blurt_and_lock_123 function acquires advisory locks that allow us to
# continue after a) the optimistic conflict probe b) after the
# insertion of the speculative tuple.
"controller_locks"
"controller_show"
"s1_begin" "s2_begin"
"s1_upsert" "s2_upsert"
"controller_show"
# Switch both sessions to wait on the other lock next time (the speculative insertion)
"controller_unlock_1_1" "controller_unlock_2_1"
# Allow both sessions to continue
"controller_unlock_1_3" "controller_unlock_2_3"
"controller_show"
# Allow the first session to finish insertion
"controller_unlock_1_2"
# But the change isn't visible yet, nor should the second session continue
"controller_show"
# Allow the second session to finish insertion, but it's blocked
"controller_unlock_2_2"
"controller_show"
# But committing should unblock
"s1_commit"
"controller_show"
"s2_commit"
"controller_show"
# Test that speculative wait is performed if a session sees a speculatively
# inserted tuple. A speculatively inserted tuple is one which has been inserted
# both into the table and the unique index but has yet to *complete* the
# speculative insertion
permutation
# acquire a number of advisory locks to control execution flow - the
# blurt_and_lock_123 function acquires advisory locks that allow us to
# continue after a) the optimistic conflict probe and b) after the
# insertion of the speculative tuple.
# blurt_and_lock_4 acquires an advisory lock which allows us to pause
# execution c) before completing the speculative insertion
# create the second index here to avoid affecting the other
# permutations.
"s1_create_non_unique_index"
# confirm that the insertion into the unique index will happen first
"s1_confirm_index_order"
"controller_locks"
"controller_show"
"s2_begin"
# Both sessions wait on advisory locks
"s1_upsert" "s2_upsert"
"controller_show"
# Switch both sessions to wait on the other lock next time (the speculative insertion)
"controller_unlock_1_1" "controller_unlock_2_1"
# Allow both sessions to do the optimistic conflict probe and do the
# speculative insertion into the table
# They will then be waiting on another advisory lock when they attempt to
# update the index
"controller_unlock_1_3" "controller_unlock_2_3"
"controller_show"
# take lock to block second session after inserting in unique index but
# before completing the speculative insert
"controller_lock_2_4"
# Allow the second session to move forward
"controller_unlock_2_2"
# This should still not show a successful insertion
"controller_show"
# Allow the first session to continue, it should perform speculative wait
"controller_unlock_1_2"
# Should report s1 is waiting on speculative lock
"controller_print_speculative_locks"
# Allow s2 to insert into the non-unique index and complete s1 will
# no longer wait on speculative lock, but proceed to wait on the
# transaction to finish.
"controller_unlock_2_4"
# Should report that s1 is now waiting for s1 to commit
"controller_print_speculative_locks"
# Once s2 commits, s1 is finally free to continue to update
"s2_commit"
# This should now show a successful UPSERT
"controller_show"
# Ensure no unexpected locks survive
"controller_print_speculative_locks"
|