Fix ON CONFLICT with REINDEX CONCURRENTLY and partitions

When planning queries with ON CONFLICT on partitioned tables, the
indexes to consider as arbiters for each partition are determined based
on those found in the parent table.  However, it's possible for an index
on a partition to be reindexed, and in that case, the auxiliary indexes
created on the partition must be considered as arbiters as well; failing
to do that may result in spurious "duplicate key" errors given
sufficient bad luck.

We fix that in this commit by matching every index that doesn't have a
parent to each initially-determined arbiter index.  Every unparented
matching index is considered an additional arbiter index.

Closely related to the fixes in bc32a12e0d and 2bc7e886fc, and for
identical reasons, not backpatched (for now) even though it's a
longstanding issue.

Author: Mihail Nikalayeu <mihailnikalayeu@gmail.com>
Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de>
Discussion: https://postgr.es/m/CANtu0ojXmqjmEzp-=aJSxjsdE76iAsRgHBoK0QtYHimb_mEfsg@mail.gmail.com
pull/255/head
Álvaro Herrera 2 weeks ago
parent 4f941d432b
commit 90eae926ab
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
  1. 170
      src/backend/executor/execPartition.c
  2. 3
      src/test/modules/injection_points/Makefile
  3. 238
      src/test/modules/injection_points/expected/reindex-concurrently-upsert-partitioned.out
  4. 3
      src/test/modules/injection_points/meson.build
  5. 113
      src/test/modules/injection_points/specs/reindex-concurrently-upsert-partitioned.spec

@ -15,6 +15,7 @@
#include "access/table.h"
#include "access/tableam.h"
#include "catalog/index.h"
#include "catalog/partition.h"
#include "executor/execPartition.h"
#include "executor/executor.h"
@ -490,6 +491,65 @@ ExecFindPartition(ModifyTableState *mtstate,
return rri;
}
/*
* IsIndexCompatibleAsArbiter
* Return true if two indexes are identical for INSERT ON CONFLICT
* purposes.
*
* Only indexes of the same relation are supported.
*/
static bool
IsIndexCompatibleAsArbiter(Relation arbiterIndexRelation,
IndexInfo *arbiterIndexInfo,
Relation indexRelation,
IndexInfo *indexInfo)
{
Assert(arbiterIndexRelation->rd_index->indrelid == indexRelation->rd_index->indrelid);
/* must match whether they're unique */
if (arbiterIndexInfo->ii_Unique != indexInfo->ii_Unique)
return false;
/* No support currently for comparing exclusion indexes. */
if (arbiterIndexInfo->ii_ExclusionOps != NULL ||
indexInfo->ii_ExclusionOps != NULL)
return false;
/* the "nulls not distinct" criterion must match */
if (arbiterIndexInfo->ii_NullsNotDistinct !=
indexInfo->ii_NullsNotDistinct)
return false;
/* number of key attributes must match */
if (arbiterIndexInfo->ii_NumIndexKeyAttrs !=
indexInfo->ii_NumIndexKeyAttrs)
return false;
for (int i = 0; i < arbiterIndexInfo->ii_NumIndexKeyAttrs; i++)
{
if (arbiterIndexRelation->rd_indcollation[i] !=
indexRelation->rd_indcollation[i])
return false;
if (arbiterIndexRelation->rd_opfamily[i] !=
indexRelation->rd_opfamily[i])
return false;
if (arbiterIndexRelation->rd_index->indkey.values[i] !=
indexRelation->rd_index->indkey.values[i])
return false;
}
if (list_difference(RelationGetIndexExpressions(arbiterIndexRelation),
RelationGetIndexExpressions(indexRelation)) != NIL)
return false;
if (list_difference(RelationGetIndexPredicate(arbiterIndexRelation),
RelationGetIndexPredicate(indexRelation)) != NIL)
return false;
return true;
}
/*
* ExecInitPartitionInfo
* Lock the partition and initialize ResultRelInfo. Also setup other
@ -689,45 +749,117 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
{
TupleDesc partrelDesc = RelationGetDescr(partrel);
ExprContext *econtext = mtstate->ps.ps_ExprContext;
ListCell *lc;
List *arbiterIndexes = NIL;
int additional_arbiters = 0;
/*
* If there is a list of arbiter indexes, map it to a list of indexes
* in the partition. We do that by scanning the partition's index
* list and searching for ancestry relationships to each index in the
* ancestor table.
* in the partition. We also add any "identical indexes" to any of
* those, to cover the case where one of them is concurrently being
* reindexed.
*/
if (rootResultRelInfo->ri_onConflictArbiterIndexes != NIL)
{
List *childIdxs;
List *unparented_idxs = NIL,
*arbiters_listidxs = NIL;
childIdxs = RelationGetIndexList(leaf_part_rri->ri_RelationDesc);
foreach(lc, childIdxs)
for (int listidx = 0; listidx < leaf_part_rri->ri_NumIndices; listidx++)
{
Oid childIdx = lfirst_oid(lc);
Oid indexoid;
List *ancestors;
ListCell *lc2;
ancestors = get_partition_ancestors(childIdx);
foreach(lc2, rootResultRelInfo->ri_onConflictArbiterIndexes)
/*
* If one of this index's ancestors is in the root's arbiter
* list, then use this index as arbiter for this partition.
* Otherwise, if this index has no parent, track it for later,
* in case REINDEX CONCURRENTLY is working on one of the
* arbiters.
*
* XXX get_partition_ancestors is slow: it scans pg_inherits
* each time. Consider a syscache or some other way to cache?
*/
indexoid = RelationGetRelid(leaf_part_rri->ri_IndexRelationDescs[listidx]);
ancestors = get_partition_ancestors(indexoid);
if (ancestors != NIL)
{
if (list_member_oid(ancestors, lfirst_oid(lc2)))
arbiterIndexes = lappend_oid(arbiterIndexes, childIdx);
foreach_oid(parent_idx, rootResultRelInfo->ri_onConflictArbiterIndexes)
{
if (list_member_oid(ancestors, parent_idx))
{
arbiterIndexes = lappend_oid(arbiterIndexes, indexoid);
arbiters_listidxs = lappend_int(arbiters_listidxs, listidx);
break;
}
}
}
else
unparented_idxs = lappend_int(unparented_idxs, listidx);
list_free(ancestors);
}
/*
* If we found any indexes with no ancestors, it's possible that
* some arbiter index is undergoing concurrent reindex. Match all
* unparented indexes against arbiters; add unparented matching
* ones as "additional arbiters".
*
* This is critical so that all concurrent transactions use the
* same set as arbiters during REINDEX CONCURRENTLY, to avoid
* spurious "duplicate key" errors.
*/
if (unparented_idxs && arbiterIndexes)
{
foreach_int(unparented_i, unparented_idxs)
{
Relation unparented_rel;
IndexInfo *unparenred_ii;
unparented_rel = leaf_part_rri->ri_IndexRelationDescs[unparented_i];
unparenred_ii = leaf_part_rri->ri_IndexRelationInfo[unparented_i];
Assert(!list_member_oid(arbiterIndexes,
unparented_rel->rd_index->indexrelid));
/* Ignore indexes not ready */
if (!unparenred_ii->ii_ReadyForInserts)
continue;
foreach_int(arbiter_i, arbiters_listidxs)
{
Relation arbiter_rel;
IndexInfo *arbiter_ii;
arbiter_rel = leaf_part_rri->ri_IndexRelationDescs[arbiter_i];
arbiter_ii = leaf_part_rri->ri_IndexRelationInfo[arbiter_i];
/*
* If the non-ancestor index is compatible with the
* arbiter, use the non-ancestor as arbiter too.
*/
if (IsIndexCompatibleAsArbiter(arbiter_rel,
arbiter_ii,
unparented_rel,
unparenred_ii))
{
arbiterIndexes = lappend_oid(arbiterIndexes,
unparented_rel->rd_index->indexrelid);
additional_arbiters++;
break;
}
}
}
}
list_free(unparented_idxs);
list_free(arbiters_listidxs);
}
/*
* If the resulting lists are of inequal length, something is wrong.
* XXX This may happen because we don't match the lists correctly when
* a partitioned index is being processed by REINDEX CONCURRENTLY.
* FIXME later.
* We expect to find as many arbiter indexes on this partition as the
* root has, plus however many "additional arbiters" (to wit: those
* being concurrently rebuilt) we found.
*/
if (list_length(rootResultRelInfo->ri_onConflictArbiterIndexes) !=
list_length(arbiterIndexes))
list_length(arbiterIndexes) - additional_arbiters)
elog(ERROR, "invalid arbiter index list");
leaf_part_rri->ri_onConflictArbiterIndexes = arbiterIndexes;

@ -18,9 +18,10 @@ ISOLATION = basic \
inplace \
syscache-update-pruned \
index-concurrently-upsert \
index-concurrently-upsert-predicate \
reindex-concurrently-upsert \
reindex-concurrently-upsert-on-constraint \
index-concurrently-upsert-predicate
reindex-concurrently-upsert-partitioned
TAP_TESTS = 1

@ -0,0 +1,238 @@
Parsed test spec with 4 sessions
starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s4_wakeup_to_set_dead s2_start_upsert s4_wakeup_s1 s4_wakeup_s2
injection_points_attach
-----------------------
(1 row)
injection_points_attach
-----------------------
(1 row)
injection_points_set_local
--------------------------
(1 row)
step s3_setup_wait_before_set_dead:
SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait');
injection_points_attach
-----------------------
(1 row)
step s3_start_reindex:
REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey;
<waiting ...>
step s1_start_upsert:
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
<waiting ...>
step s4_wakeup_to_set_dead:
SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead');
SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s2_start_upsert:
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
<waiting ...>
step s4_wakeup_s1:
SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict');
SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s1_start_upsert: <... completed>
step s4_wakeup_s2:
SELECT injection_points_detach('exec-insert-before-insert-speculative');
SELECT injection_points_wakeup('exec-insert-before-insert-speculative');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s2_start_upsert: <... completed>
step s3_start_reindex: <... completed>
starting permutation: s3_setup_wait_before_swap s3_start_reindex s1_start_upsert s4_wakeup_to_swap s2_start_upsert s4_wakeup_s2 s4_wakeup_s1
injection_points_attach
-----------------------
(1 row)
injection_points_attach
-----------------------
(1 row)
injection_points_set_local
--------------------------
(1 row)
step s3_setup_wait_before_swap:
SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait');
injection_points_attach
-----------------------
(1 row)
step s3_start_reindex:
REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey;
<waiting ...>
step s1_start_upsert:
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
<waiting ...>
step s4_wakeup_to_swap:
SELECT injection_points_detach('reindex-relation-concurrently-before-swap');
SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s2_start_upsert:
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
<waiting ...>
step s4_wakeup_s2:
SELECT injection_points_detach('exec-insert-before-insert-speculative');
SELECT injection_points_wakeup('exec-insert-before-insert-speculative');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s4_wakeup_s1:
SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict');
SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s1_start_upsert: <... completed>
step s2_start_upsert: <... completed>
step s3_start_reindex: <... completed>
starting permutation: s3_setup_wait_before_set_dead s3_start_reindex s1_start_upsert s2_start_upsert s4_wakeup_s1 s4_wakeup_to_set_dead s4_wakeup_s2
injection_points_attach
-----------------------
(1 row)
injection_points_attach
-----------------------
(1 row)
injection_points_set_local
--------------------------
(1 row)
step s3_setup_wait_before_set_dead:
SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait');
injection_points_attach
-----------------------
(1 row)
step s3_start_reindex:
REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey;
<waiting ...>
step s1_start_upsert:
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
<waiting ...>
step s2_start_upsert:
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
<waiting ...>
step s4_wakeup_s1:
SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict');
SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s1_start_upsert: <... completed>
step s4_wakeup_to_set_dead:
SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead');
SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s4_wakeup_s2:
SELECT injection_points_detach('exec-insert-before-insert-speculative');
SELECT injection_points_wakeup('exec-insert-before-insert-speculative');
injection_points_detach
-----------------------
(1 row)
injection_points_wakeup
-----------------------
(1 row)
step s2_start_upsert: <... completed>
step s3_start_reindex: <... completed>

@ -49,9 +49,10 @@ tests += {
'inplace',
'syscache-update-pruned',
'index-concurrently-upsert',
'index-concurrently-upsert-predicate',
'reindex-concurrently-upsert',
'reindex-concurrently-upsert-on-constraint',
'index-concurrently-upsert-predicate',
'reindex-concurrently-upsert-partitioned',
],
'runningcheck': false, # see syscache-update-pruned
# Some tests wait for all snapshots, so avoid parallel execution

@ -0,0 +1,113 @@
# This test verifies INSERT ON CONFLICT DO UPDATE behavior on partitioned
# tables concurrent with REINDEX CONCURRENTLY.
#
# - s1: UPSERT a tuple
# - s2: UPSERT the same tuple
# - s3: concurrently REINDEX the primary key index
#
# - s4: controls concurrency via injection points
setup
{
CREATE EXTENSION injection_points;
CREATE SCHEMA test;
CREATE TABLE test.tbl(i int primary key, updated_at timestamp) PARTITION BY RANGE (i);
CREATE TABLE test.tbl_partition PARTITION OF test.tbl
FOR VALUES FROM (0) TO (10000)
WITH (parallel_workers = 0);
}
teardown
{
DROP SCHEMA test CASCADE;
DROP EXTENSION injection_points;
}
session s1
setup
{
SELECT injection_points_set_local();
SELECT injection_points_attach('check-exclusion-or-unique-constraint-no-conflict', 'wait');
}
step s1_start_upsert
{
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
}
session s2
setup
{
SELECT injection_points_set_local();
SELECT injection_points_attach('exec-insert-before-insert-speculative', 'wait');
}
step s2_start_upsert
{
INSERT INTO test.tbl VALUES (13, now()) ON CONFLICT (i) DO UPDATE SET updated_at = now();
}
session s3
setup
{
SELECT injection_points_set_local();
}
step s3_setup_wait_before_set_dead
{
SELECT injection_points_attach('reindex-relation-concurrently-before-set-dead', 'wait');
}
step s3_setup_wait_before_swap
{
SELECT injection_points_attach('reindex-relation-concurrently-before-swap', 'wait');
}
step s3_start_reindex
{
REINDEX INDEX CONCURRENTLY test.tbl_partition_pkey;
}
session s4
step s4_wakeup_to_swap
{
SELECT injection_points_detach('reindex-relation-concurrently-before-swap');
SELECT injection_points_wakeup('reindex-relation-concurrently-before-swap');
}
step s4_wakeup_s1
{
SELECT injection_points_detach('check-exclusion-or-unique-constraint-no-conflict');
SELECT injection_points_wakeup('check-exclusion-or-unique-constraint-no-conflict');
}
step s4_wakeup_s2
{
SELECT injection_points_detach('exec-insert-before-insert-speculative');
SELECT injection_points_wakeup('exec-insert-before-insert-speculative');
}
step s4_wakeup_to_set_dead
{
SELECT injection_points_detach('reindex-relation-concurrently-before-set-dead');
SELECT injection_points_wakeup('reindex-relation-concurrently-before-set-dead');
}
permutation
s3_setup_wait_before_set_dead
s3_start_reindex(s1_start_upsert, s2_start_upsert)
s1_start_upsert
s4_wakeup_to_set_dead
s2_start_upsert(s1_start_upsert)
s4_wakeup_s1
s4_wakeup_s2
permutation
s3_setup_wait_before_swap
s3_start_reindex(s1_start_upsert, s2_start_upsert)
s1_start_upsert
s4_wakeup_to_swap
s2_start_upsert(s1_start_upsert)
s4_wakeup_s2
s4_wakeup_s1
permutation
s3_setup_wait_before_set_dead
s3_start_reindex(s1_start_upsert, s2_start_upsert)
s1_start_upsert
s2_start_upsert(s1_start_upsert)
s4_wakeup_s1
s4_wakeup_to_set_dead
s4_wakeup_s2
Loading…
Cancel
Save