mirror of https://github.com/postgres/postgres
Commit 216a784829 introduced parallel apply workers, allowing multiple
processes to share a replication origin. To support this,
replorigin_session_setup() was extended to accept a pid argument
identifying the process using the origin.
This commit exposes that capability through the SQL interface function
pg_replication_origin_session_setup() by adding an optional pid parameter.
This enables multiple processes to coordinate replication using the same
origin when using SQL-level replication functions.
This change allows the non-builtin logical replication solutions to
implement parallel apply for large transactions.
Additionally, an existing internal error was made user-facing, as it can
now be triggered via the exposed SQL API.
Author: Doruk Yilmaz <doruk@mixrank.com>
Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com
Discussion: https://postgr.es/m/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com
pull/241/head
parent
8aac5923a3
commit
5b148706c5
@ -0,0 +1,79 @@ |
||||
Parsed test spec with 2 sessions |
||||
|
||||
starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset |
||||
step s0_setup: SELECT pg_replication_origin_session_setup('origin'); |
||||
pg_replication_origin_session_setup |
||||
----------------------------------- |
||||
|
||||
(1 row) |
||||
|
||||
step s0_is_setup: SELECT pg_replication_origin_session_is_setup(); |
||||
pg_replication_origin_session_is_setup |
||||
-------------------------------------- |
||||
t |
||||
(1 row) |
||||
|
||||
step s1_setup: |
||||
SELECT pg_replication_origin_session_setup('origin', pid) |
||||
FROM pg_stat_activity |
||||
WHERE application_name = 'isolation/parallel_session_origin/s0'; |
||||
|
||||
pg_replication_origin_session_setup |
||||
----------------------------------- |
||||
|
||||
(1 row) |
||||
|
||||
step s1_is_setup: SELECT pg_replication_origin_session_is_setup(); |
||||
pg_replication_origin_session_is_setup |
||||
-------------------------------------- |
||||
t |
||||
(1 row) |
||||
|
||||
step s0_add_message: |
||||
SELECT 1 |
||||
FROM pg_logical_emit_message(true, 'prefix', 'message on s0'); |
||||
|
||||
?column? |
||||
-------- |
||||
1 |
||||
(1 row) |
||||
|
||||
step s0_store_lsn: |
||||
INSERT INTO local_lsn_store |
||||
SELECT 0, local_lsn FROM pg_replication_origin_status; |
||||
|
||||
step s1_add_message: |
||||
SELECT 1 |
||||
FROM pg_logical_emit_message(true, 'prefix', 'message on s1'); |
||||
|
||||
?column? |
||||
-------- |
||||
1 |
||||
(1 row) |
||||
|
||||
step s1_store_lsn: |
||||
INSERT INTO local_lsn_store |
||||
SELECT 1, local_lsn FROM pg_replication_origin_status; |
||||
|
||||
step s0_compare: |
||||
SELECT s0.lsn < s1.lsn |
||||
FROM local_lsn_store as s0, local_lsn_store as s1 |
||||
WHERE s0.session = 0 AND s1.session = 1; |
||||
|
||||
?column? |
||||
-------- |
||||
t |
||||
(1 row) |
||||
|
||||
step s0_reset: SELECT pg_replication_origin_session_reset(); |
||||
pg_replication_origin_session_reset |
||||
----------------------------------- |
||||
|
||||
(1 row) |
||||
|
||||
step s1_reset: SELECT pg_replication_origin_session_reset(); |
||||
pg_replication_origin_session_reset |
||||
----------------------------------- |
||||
|
||||
(1 row) |
||||
|
||||
@ -0,0 +1,56 @@ |
||||
# Test parallel replication origin manipulations; ensure local_lsn can be |
||||
# updated by all attached sessions. |
||||
|
||||
setup |
||||
{ |
||||
SELECT pg_replication_origin_create('origin'); |
||||
CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn); |
||||
} |
||||
|
||||
teardown |
||||
{ |
||||
SELECT pg_replication_origin_drop('origin'); |
||||
DROP TABLE local_lsn_store; |
||||
} |
||||
|
||||
session "s0" |
||||
setup { SET synchronous_commit = on; } |
||||
step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); } |
||||
step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); } |
||||
step "s0_add_message" { |
||||
SELECT 1 |
||||
FROM pg_logical_emit_message(true, 'prefix', 'message on s0'); |
||||
} |
||||
step "s0_store_lsn" { |
||||
INSERT INTO local_lsn_store |
||||
SELECT 0, local_lsn FROM pg_replication_origin_status; |
||||
} |
||||
step "s0_compare" { |
||||
SELECT s0.lsn < s1.lsn |
||||
FROM local_lsn_store as s0, local_lsn_store as s1 |
||||
WHERE s0.session = 0 AND s1.session = 1; |
||||
} |
||||
step "s0_reset" { SELECT pg_replication_origin_session_reset(); } |
||||
|
||||
session "s1" |
||||
setup { SET synchronous_commit = on; } |
||||
step "s1_setup" { |
||||
SELECT pg_replication_origin_session_setup('origin', pid) |
||||
FROM pg_stat_activity |
||||
WHERE application_name = 'isolation/parallel_session_origin/s0'; |
||||
} |
||||
step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); } |
||||
step "s1_add_message" { |
||||
SELECT 1 |
||||
FROM pg_logical_emit_message(true, 'prefix', 'message on s1'); |
||||
} |
||||
step "s1_store_lsn" { |
||||
INSERT INTO local_lsn_store |
||||
SELECT 1, local_lsn FROM pg_replication_origin_status; |
||||
} |
||||
step "s1_reset" { SELECT pg_replication_origin_session_reset(); } |
||||
|
||||
# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions |
||||
# commits a transaction and store the local_lsn of the replication origin. |
||||
# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn. |
||||
permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset" |
||||
Loading…
Reference in new issue