@ -40,35 +40,72 @@ $node_subscriber->safe_psql('postgres', $ddl);
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE PUBLICATION tpub FOR ALL TABLES" ) ;
# ------------------------------------------------------
# Ensure binary mode also executes COPY in binary format
# ------------------------------------------------------
# Insert some content before creating a subscription
$ node_publisher - > safe_psql (
'postgres' , qq(
INSERT INTO public . test_numerical ( a , b , c , d ) VALUES
( 1 , 1.2 , 1.3 , 10 ) ,
( 2 , 2.2 , 2.3 , 20 ) ;
INSERT INTO public . test_arrays ( a , b , c ) VALUES
( '{1,2,3}' , '{1.1, 1.2, 1.3}' , '{"one", "two", "three"}' ) ,
( '{3,1,2}' , '{1.3, 1.1, 1.2}' , '{"three", "one", "two"}' ) ;
) ) ;
my $ publisher_connstring = $ node_publisher - > connstr . ' dbname=postgres' ;
$ node_subscriber - > safe_psql ( 'postgres' ,
"CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)" ) ;
# Ensure the COPY command is executed in binary format on the publisher
$ node_publisher - > wait_for_log (
qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT WITH \(FORMAT binary\)/
) ;
# Ensure nodes are in sync with each other
$ node_subscriber - > wait_for_subscription_sync ( $ node_publisher , 'tsub' ) ;
my $ sync_check = qq(
SELECT a , b , c , d FROM test_numerical ORDER BY a ;
SELECT a , b , c FROM test_arrays ORDER BY a ;
) ;
# Check the synced data on the subscriber
my $ result = $ node_subscriber - > safe_psql ( 'postgres' , $ sync_check ) ;
is ( $ result , ' 1 | 1.2 | 1.3 | 10
2 | 2.2 | 2.3 | 20
{ 1 , 2 , 3 } | { 1.1 , 1.2 , 1.3 } | { one , two , three }
{ 3 , 1 , 2 } | { 1.3 , 1.1 , 1.2 } | { three , one , two } ', ' check synced data on subscriber ' ) ;
# ----------------------------------
# Ensure apply works in binary mode
# ----------------------------------
# Insert some content and make sure it's replicated across
$ node_publisher - > safe_psql (
'postgres' , qq(
INSERT INTO public . test_arrays ( a , b , c ) VALUES
( '{1,2,3}' , '{1.1, 1.2, 1.3}' , '{"one", "two", "three"}' ) ,
( '{3,1,2}' , '{1.3, 1.1, 1.2}' , '{"three", "one", "two"}' ) ;
( '{2,1 ,3}' , '{1.2, 1.1 , 1.3}' , '{"tw o", "one ", "three"}' ) ,
( '{1,3 ,2}' , '{1.1, 1.3, 1.2}' , '{"one", "thre e", "two"}' ) ;
INSERT INTO public . test_numerical ( a , b , c , d ) VALUES
( 1 , 1.2 , 1.3 , 10 ) ,
( 2 , 2.2 , 2.3 , 20 ) ,
( 3 , 3.2 , 3.3 , 30 ) ;
( 3 , 3.2 , 3.3 , 30 ) ,
( 4 , 4.2 , 4.3 , 40 ) ;
) ) ;
$ node_publisher - > wait_for_catchup ( 'tsub' ) ;
my $ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT a, b, c, d FROM test_numerical ORDER BY a" ) ;
is ( $ result , ' 1 | 1.2 | 1.3 | 10
2 | 2.2 | 2.3 | 20
3 | 3.2 | 3.3 | 30 ', ' check replicated data on subscriber ' ) ;
3 | 3.2 | 3.3 | 30
4 | 4.2 | 4.3 | 40 ', ' check replicated data on subscriber ' ) ;
# Test updates as well
$ node_publisher - > safe_psql (
@ -83,6 +120,8 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c FROM test_arrays ORDER BY a" ) ;
is ( $ result , ' { 1 , 2 , 3 } | { 42 , 1.2 , 1.3 } |
{ 1 , 3 , 2 } | { 42 , 1.3 , 1.2 } |
{ 2 , 1 , 3 } | { 42 , 1.1 , 1.3 } |
{ 3 , 1 , 2 } | { 42 , 1.1 , 1.2 } | ', ' check updated replicated data on subscriber ' ) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
@ -90,7 +129,12 @@ $result = $node_subscriber->safe_psql('postgres',
is ( $ result , ' 1 | 42 || 10
2 | 42 || 20
3 | 42 || 30 ', ' check updated replicated data on subscriber ' ) ;
3 | 42 || 30
4 | 42 || 40 ', ' check updated replicated data on subscriber ' ) ;
# ------------------------------------------------------------------------------
# Use ALTER SUBSCRIPTION to change to text format and then back to binary format
# ------------------------------------------------------------------------------
# Test to reset back to text formatting, and then to binary again
$ node_subscriber - > safe_psql ( 'postgres' ,
@ -99,7 +143,7 @@ $node_subscriber->safe_psql('postgres',
$ node_publisher - > safe_psql (
'postgres' , qq(
INSERT INTO public . test_numerical ( a , b , c , d ) VALUES
( 4 , 4.2 , 4.3 , 4 0) ;
( 5 , 5.2 , 5.3 , 5 0) ;
) ) ;
$ node_publisher - > wait_for_catchup ( 'tsub' ) ;
@ -110,7 +154,8 @@ $result = $node_subscriber->safe_psql('postgres',
is ( $ result , ' 1 | 42 || 10
2 | 42 || 20
3 | 42 || 30
4 | 4.2 | 4.3 | 40 ', ' check replicated data on subscriber ' ) ;
4 | 42 || 40
5 | 5.2 | 5.3 | 50 ', ' check replicated data on subscriber ' ) ;
$ node_subscriber - > safe_psql ( 'postgres' ,
"ALTER SUBSCRIPTION tsub SET (binary = true);" ) ;
@ -127,9 +172,124 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c FROM test_arrays ORDER BY a" ) ;
is ( $ result , ' { 1 , 2 , 3 } | { 42 , 1.2 , 1.3 } |
{ 1 , 3 , 2 } | { 42 , 1.3 , 1.2 } |
{ 2 , 1 , 3 } | { 42 , 1.1 , 1.3 } |
{ 2 , 3 , 1 } | { 1.2 , 1.3 , 1.1 } | { two , three , one }
{ 3 , 1 , 2 } | { 42 , 1.1 , 1.2 } | ', ' check replicated data on subscriber ' ) ;
# ---------------------------------------------------------------
# Test binary replication without and with send/receive functions
# ---------------------------------------------------------------
# Create a custom type without send/rcv functions
$ ddl = qq(
CREATE TYPE myvarchar ;
CREATE FUNCTION myvarcharin ( cstring , oid , integer ) RETURNS myvarchar
LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin' ;
CREATE FUNCTION myvarcharout ( myvarchar ) RETURNS cstring
LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout' ;
CREATE TYPE myvarchar (
input = myvarcharin ,
output = myvarcharout ) ;
CREATE TABLE public . test_myvarchar (
a myvarchar
) ; ) ;
$ node_publisher - > safe_psql ( 'postgres' , $ ddl ) ;
$ node_subscriber - > safe_psql ( 'postgres' , $ ddl ) ;
# Insert some initial data
$ node_publisher - > safe_psql (
'postgres' , qq(
INSERT INTO public . test_myvarchar ( a ) VALUES
( 'a' ) ;
) ) ;
# Check the subscriber log from now on.
my $ offset = - s $ node_subscriber - > logfile ;
# Refresh the publication to trigger the tablesync
$ node_subscriber - > safe_psql ( 'postgres' ,
"ALTER SUBSCRIPTION tsub REFRESH PUBLICATION" ) ;
# It should fail
$ node_subscriber - > wait_for_log (
qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/ ,
$ offset ) ;
# Create and set send/rcv functions for the custom type
$ ddl = qq(
CREATE FUNCTION myvarcharsend ( myvarchar ) RETURNS bytea
LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend' ;
CREATE FUNCTION myvarcharrecv ( internal , oid , integer ) RETURNS myvarchar
LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv' ;
ALTER TYPE myvarchar SET (
send = myvarcharsend ,
receive = myvarcharrecv
) ; ) ;
$ node_publisher - > safe_psql ( 'postgres' , $ ddl ) ;
$ node_subscriber - > safe_psql ( 'postgres' , $ ddl ) ;
# Now tablesync should succeed
$ node_subscriber - > wait_for_subscription_sync ( $ node_publisher , 'tsub' ) ;
# Check the synced data on the subscriber
$ result =
$ node_subscriber - > safe_psql ( 'postgres' , 'SELECT a FROM test_myvarchar;' ) ;
is ( $ result , 'a' , 'check synced data on subscriber with custom type' ) ;
# -----------------------------------------------------
# Test mismatched column types with/without binary mode
# -----------------------------------------------------
# Test syncing tables with mismatching column types
$ node_publisher - > safe_psql (
'postgres' , qq(
CREATE TABLE public . test_mismatching_types (
a bigint PRIMARY KEY
) ;
INSERT INTO public . test_mismatching_types ( a )
VALUES ( 1 ) , ( 2 ) ;
) ) ;
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
$ node_subscriber - > safe_psql (
'postgres' , qq(
CREATE TABLE public . test_mismatching_types (
a int PRIMARY KEY
) ;
ALTER SUBSCRIPTION tsub REFRESH PUBLICATION ;
) ) ;
# Cannot sync due to type mismatch
$ node_subscriber - > wait_for_log (
qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/ , $ offset ) ;
# Check the publisher log from now on.
$ offset = - s $ node_publisher - > logfile ;
# Setting binary to false should allow syncing
$ node_subscriber - > safe_psql (
'postgres' , qq(
ALTER SUBSCRIPTION tsub SET ( binary = false ) ; ) ) ;
# Ensure the COPY command is executed in text format on the publisher
$ node_publisher - > wait_for_log (
qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT\n/ , $ offset ) ;
$ node_subscriber - > wait_for_subscription_sync ( $ node_publisher , 'tsub' ) ;
# Check the synced data on the subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
'SELECT a FROM test_mismatching_types ORDER BY a;' ) ;
is ( $ result , ' 1
2 ', ' check synced data on subscriber with binary = false ' ) ;
$ node_subscriber - > stop ( 'fast' ) ;
$ node_publisher - > stop ( 'fast' ) ;