@ -32,59 +32,72 @@ PG_MODULE_MAGIC;
void _PG_init ( void ) ;
/* Current connection to the primary, if any */
static PGconn * streamConn = NULL ;
/* Buffer for currently read records */
static char * recvBuf = NULL ;
struct WalReceiverConn
{
/* Current connection to the primary, if any */
PGconn * streamConn ;
/* Used to remember if the connection is logical or physical */
bool logical ;
/* Buffer for currently read records */
char * recvBuf ;
} ;
/* Prototypes for interface functions */
static void libpqrcv_connect ( char * conninfo ) ;
static char * libpqrcv_get_conninfo ( void ) ;
static void libpqrcv_identify_system ( TimeLineID * primary_tli ) ;
static void libpqrcv_readtimelinehistoryfile ( TimeLineID tli , char * * filename , char * * content , int * len ) ;
static bool libpqrcv_startstreaming ( TimeLineID tli , XLogRecPtr startpoint ,
char * slotname ) ;
static void libpqrcv_endstreaming ( TimeLineID * next_tli ) ;
static int libpqrcv_receive ( char * * buffer , pgsocket * wait_fd ) ;
static void libpqrcv_send ( const char * buffer , int nbytes ) ;
static void libpqrcv_disconnect ( void ) ;
static WalReceiverConn * libpqrcv_connect ( const char * conninfo ,
bool logical , const char * appname ) ;
static char * libpqrcv_get_conninfo ( WalReceiverConn * conn ) ;
static char * libpqrcv_identify_system ( WalReceiverConn * conn ,
TimeLineID * primary_tli ) ;
static void libpqrcv_readtimelinehistoryfile ( WalReceiverConn * conn ,
TimeLineID tli , char * * filename ,
char * * content , int * len ) ;
static bool libpqrcv_startstreaming ( WalReceiverConn * conn ,
TimeLineID tli , XLogRecPtr startpoint ,
const char * slotname ) ;
static void libpqrcv_endstreaming ( WalReceiverConn * conn ,
TimeLineID * next_tli ) ;
static int libpqrcv_receive ( WalReceiverConn * conn , char * * buffer ,
pgsocket * wait_fd ) ;
static void libpqrcv_send ( WalReceiverConn * conn , const char * buffer ,
int nbytes ) ;
static void libpqrcv_disconnect ( WalReceiverConn * conn ) ;
static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_connect ,
libpqrcv_get_conninfo ,
libpqrcv_identify_system ,
libpqrcv_readtimelinehistoryfile ,
libpqrcv_startstreaming ,
libpqrcv_endstreaming ,
libpqrcv_receive ,
libpqrcv_send ,
libpqrcv_disconnect
} ;
/* Prototypes for private functions */
static PGresult * libpqrcv_PQexec ( const char * query ) ;
static PGresult * libpqrcv_PQexec ( PGconn * streamConn , const char * query ) ;
/*
* Module load callback
* Module initialization function
*/
void
_PG_init ( void )
{
/* Tell walreceiver how to reach us */
if ( walrcv_connect ! = NULL | | walrcv_identify_system ! = NULL | |
walrcv_readtimelinehistoryfile ! = NULL | |
walrcv_startstreaming ! = NULL | | walrcv_endstreaming ! = NULL | |
walrcv_receive ! = NULL | | walrcv_send ! = NULL | |
walrcv_disconnect ! = NULL )
if ( WalReceiverFunctions ! = NULL )
elog ( ERROR , " libpqwalreceiver already loaded " ) ;
walrcv_connect = libpqrcv_connect ;
walrcv_get_conninfo = libpqrcv_get_conninfo ;
walrcv_identify_system = libpqrcv_identify_system ;
walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile ;
walrcv_startstreaming = libpqrcv_startstreaming ;
walrcv_endstreaming = libpqrcv_endstreaming ;
walrcv_receive = libpqrcv_receive ;
walrcv_send = libpqrcv_send ;
walrcv_disconnect = libpqrcv_disconnect ;
WalReceiverFunctions = & PQWalReceiverFunctions ;
}
/*
* Establish the connection to the primary server for XLOG streaming
*/
static void
libpqrcv_connect ( char * conninfo )
static WalReceiverConn *
libpqrcv_connect ( const char * conninfo , bool logical , const char * appname )
{
WalReceiverConn * conn ;
const char * keys [ 5 ] ;
const char * vals [ 5 ] ;
int i = 0 ;
/*
* We use the expand_dbname parameter to process the connection string ( or
@ -93,22 +106,29 @@ libpqrcv_connect(char *conninfo)
* database name is ignored by the server in replication mode , but specify
* " replication " for . pgpass lookup .
*/
keys [ 0 ] = " dbname " ;
vals [ 0 ] = conninfo ;
keys [ 1 ] = " replication " ;
vals [ 1 ] = " true " ;
keys [ 2 ] = " dbname " ;
vals [ 2 ] = " replication " ;
keys [ 3 ] = " fallback_application_name " ;
vals [ 3 ] = " walreceiver " ;
keys [ 4 ] = NULL ;
vals [ 4 ] = NULL ;
streamConn = PQconnectdbParams ( keys , vals , /* expand_dbname = */ true ) ;
if ( PQstatus ( streamConn ) ! = CONNECTION_OK )
keys [ i ] = " dbname " ;
vals [ i ] = conninfo ;
keys [ + + i ] = " replication " ;
vals [ i ] = logical ? " database " : " true " ;
if ( ! logical )
{
keys [ + + i ] = " dbname " ;
vals [ i ] = " replication " ;
}
keys [ + + i ] = " fallback_application_name " ;
vals [ i ] = appname ;
keys [ + + i ] = NULL ;
vals [ i ] = NULL ;
conn = palloc0 ( sizeof ( WalReceiverConn ) ) ;
conn - > streamConn = PQconnectdbParams ( keys , vals , /* expand_dbname = */ true ) ;
if ( PQstatus ( conn - > streamConn ) ! = CONNECTION_OK )
ereport ( ERROR ,
( errmsg ( " could not connect to the primary server: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
conn - > logical = logical ;
return conn ;
}
/*
@ -116,17 +136,17 @@ libpqrcv_connect(char *conninfo)
* are obfuscated .
*/
static char *
libpqrcv_get_conninfo ( void )
libpqrcv_get_conninfo ( WalReceiverConn * conn )
{
PQconninfoOption * conn_opts ;
PQconninfoOption * conn_opt ;
PQExpBufferData buf ;
char * retval ;
Assert ( streamConn ! = NULL ) ;
Assert ( conn - > streamConn ! = NULL ) ;
initPQExpBuffer ( & buf ) ;
conn_opts = PQconninfo ( streamConn ) ;
conn_opts = PQconninfo ( conn - > streamConn ) ;
if ( conn_opts = = NULL )
ereport ( ERROR ,
@ -164,25 +184,24 @@ libpqrcv_get_conninfo(void)
* Check that primary ' s system identifier matches ours , and fetch the current
* timeline ID of the primary .
*/
static void
libpqrcv_identify_system ( TimeLineID * primary_tli )
static char *
libpqrcv_identify_system ( WalReceiverConn * conn , TimeLineID * primary_tli )
{
PGresult * res ;
char * primary_sysid ;
char standby_sysid [ 32 ] ;
/*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server .
*/
res = libpqrcv_PQexec ( " IDENTIFY_SYSTEM " ) ;
res = libpqrcv_PQexec ( conn - > streamConn , " IDENTIFY_SYSTEM " ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
{
PQclear ( res ) ;
ereport ( ERROR ,
( errmsg ( " could not receive database system identifier and timeline ID from "
" the primary server: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
}
if ( PQnfields ( res ) < 3 | | PQntuples ( res ) ! = 1 )
{
@ -195,24 +214,11 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
errdetail ( " Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields. " ,
ntuples , nfields , 3 , 1 ) ) ) ;
}
primary_sysid = PQgetvalue ( res , 0 , 0 ) ;
primary_sysid = pstrdup ( PQgetvalue ( res , 0 , 0 ) ) ;
* primary_tli = pg_atoi ( PQgetvalue ( res , 0 , 1 ) , 4 , 0 ) ;
/*
* Confirm that the system identifier of the primary is the same as ours .
*/
snprintf ( standby_sysid , sizeof ( standby_sysid ) , UINT64_FORMAT ,
GetSystemIdentifier ( ) ) ;
if ( strcmp ( primary_sysid , standby_sysid ) ! = 0 )
{
primary_sysid = pstrdup ( primary_sysid ) ;
PQclear ( res ) ;
ereport ( ERROR ,
( errmsg ( " database system identifier differs between the primary and standby " ) ,
errdetail ( " The primary's identifier is %s, the standby's identifier is %s. " ,
primary_sysid , standby_sysid ) ) ) ;
}
PQclear ( res ) ;
return primary_sysid ;
}
/*
@ -226,21 +232,30 @@ libpqrcv_identify_system(TimeLineID *primary_tli)
* throws an ERROR .
*/
static bool
libpqrcv_startstreaming ( TimeLineID tli , XLogRecPtr startpoint , char * slotname )
libpqrcv_startstreaming ( WalReceiverConn * conn ,
TimeLineID tli , XLogRecPtr startpoint ,
const char * slotname )
{
char cmd [ 256 ] ;
StringInfoData cmd ;
PGresult * res ;
Assert ( ! conn - > logical ) ;
initStringInfo ( & cmd ) ;
/* Start streaming from the point requested by startup process */
if ( slotname ! = NULL )
snprintf ( cmd , sizeof ( cmd ) ,
" START_REPLICATION SLOT \" %s \" %X/%X TIMELINE %u " , slotname ,
( uint32 ) ( startpoint > > 32 ) , ( uint32 ) startpoint , tli ) ;
appendStringInfo ( & cmd ,
" START_REPLICATION SLOT \" %s \" %X/%X TIMELINE %u " ,
slotname ,
( uint32 ) ( startpoint > > 32 ) , ( uint32 ) startpoint ,
tli ) ;
else
snprintf ( cmd , sizeof ( cmd ) ,
" START_REPLICATION %X/%X TIMELINE %u " ,
( uint32 ) ( startpoint > > 32 ) , ( uint32 ) startpoint , tli ) ;
res = libpqrcv_PQexec ( cmd ) ;
appendStringInfo ( & cmd , " START_REPLICATION %X/%X TIMELINE %u " ,
( uint32 ) ( startpoint > > 32 ) , ( uint32 ) startpoint ,
tli ) ;
res = libpqrcv_PQexec ( conn - > streamConn , cmd . data ) ;
pfree ( cmd . data ) ;
if ( PQresultStatus ( res ) = = PGRES_COMMAND_OK )
{
@ -252,7 +267,7 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
PQclear ( res ) ;
ereport ( ERROR ,
( errmsg ( " could not start WAL streaming: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
}
PQclear ( res ) ;
return true ;
@ -263,14 +278,17 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname)
* reported by the server , or 0 if it did not report it .
*/
static void
libpqrcv_endstreaming ( TimeLineID * next_tli )
libpqrcv_endstreaming ( WalReceiverConn * conn , TimeLineID * next_tli )
{
PGresult * res ;
if ( PQputCopyEnd ( streamConn , NULL ) < = 0 | | PQflush ( streamConn ) )
if ( PQputCopyEnd ( conn - > streamConn , NULL ) < = 0 | |
PQflush ( conn - > streamConn ) )
ereport ( ERROR ,
( errmsg ( " could not send end-of-streaming message to primary: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
* next_tli = 0 ;
/*
* After COPY is finished , we should receive a result set indicating the
@ -282,7 +300,7 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
* called after receiving CopyDone from the backend - the walreceiver
* never terminates replication on its own initiative .
*/
res = PQgetResult ( streamConn ) ;
res = PQgetResult ( conn - > streamConn ) ;
if ( PQresultStatus ( res ) = = PGRES_TUPLES_OK )
{
/*
@ -296,47 +314,58 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
PQclear ( res ) ;
/* the result set should be followed by CommandComplete */
res = PQgetResult ( streamConn ) ;
res = PQgetResult ( conn - > streamConn ) ;
}
else if ( PQresultStatus ( res ) = = PGRES_COPY_OUT )
{
PQclear ( res ) ;
/* End the copy */
PQendcopy ( conn - > streamConn ) ;
/* CommandComplete should follow */
res = PQgetResult ( conn - > streamConn ) ;
}
else
* next_tli = 0 ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
ereport ( ERROR ,
( errmsg ( " error reading result of streaming command: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
PQclear ( res ) ;
/* Verify that there are no more results */
res = PQgetResult ( streamConn ) ;
res = PQgetResult ( conn - > streamConn ) ;
if ( res ! = NULL )
ereport ( ERROR ,
( errmsg ( " unexpected result after CommandComplete: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
}
/*
* Fetch the timeline history file for ' tli ' from primary .
*/
static void
libpqrcv_readtimelinehistoryfile ( TimeLineID tli ,
char * * filename , char * * content , int * len )
libpqrcv_readtimelinehistoryfile ( WalReceiverConn * conn ,
TimeLineID tli , char * * filename ,
char * * content , int * len )
{
PGresult * res ;
char cmd [ 64 ] ;
Assert ( ! conn - > logical ) ;
/*
* Request the primary to send over the history file for given timeline .
*/
snprintf ( cmd , sizeof ( cmd ) , " TIMELINE_HISTORY %u " , tli ) ;
res = libpqrcv_PQexec ( cmd ) ;
res = libpqrcv_PQexec ( conn - > streamConn , c md ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
{
PQclear ( res ) ;
ereport ( ERROR ,
( errmsg ( " could not receive timeline history file from "
" the primary server: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
}
if ( PQnfields ( res ) ! = 2 | | PQntuples ( res ) ! = 1 )
{
@ -374,7 +403,7 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
* Queries are always executed on the connection in streamConn .
*/
static PGresult *
libpqrcv_PQexec ( const char * query )
libpqrcv_PQexec ( PGconn * streamConn , const char * query )
{
PGresult * result = NULL ;
PGresult * lastResult = NULL ;
@ -455,10 +484,12 @@ libpqrcv_PQexec(const char *query)
* Disconnect connection to primary , if any .
*/
static void
libpqrcv_disconnect ( void )
libpqrcv_disconnect ( WalReceiverConn * conn )
{
PQfinish ( streamConn ) ;
streamConn = NULL ;
PQfinish ( conn - > streamConn ) ;
if ( conn - > recvBuf ! = NULL )
PQfreemem ( conn - > recvBuf ) ;
pfree ( conn ) ;
}
/*
@ -478,30 +509,31 @@ libpqrcv_disconnect(void)
* ereports on error .
*/
static int
libpqrcv_receive ( char * * buffer , pgsocket * wait_fd )
libpqrcv_receive ( WalReceiverConn * conn , char * * buffer ,
pgsocket * wait_fd )
{
int rawlen ;
if ( recvBuf ! = NULL )
PQfreemem ( recvBuf ) ;
recvBuf = NULL ;
if ( conn - > recvBuf ! = NULL )
PQfreemem ( conn - > recvBuf ) ;
conn - > recvBuf = NULL ;
/* Try to receive a CopyData message */
rawlen = PQgetCopyData ( streamConn , & recvBuf , 1 ) ;
rawlen = PQgetCopyData ( conn - > streamConn , & conn - > recvBuf , 1 ) ;
if ( rawlen = = 0 )
{
/* Try consuming some data. */
if ( PQconsumeInput ( streamConn ) = = 0 )
if ( PQconsumeInput ( conn - > streamConn ) = = 0 )
ereport ( ERROR ,
( errmsg ( " could not receive data from WAL stream: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData ( streamConn , & recvBuf , 1 ) ;
rawlen = PQgetCopyData ( conn - > streamConn , & conn - > recvBuf , 1 ) ;
if ( rawlen = = 0 )
{
/* Tell caller to try again when our socket is ready. */
* wait_fd = PQsocket ( streamConn ) ;
* wait_fd = PQsocket ( conn - > streamConn ) ;
return 0 ;
}
}
@ -509,7 +541,7 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
{
PGresult * res ;
res = PQgetResult ( streamConn ) ;
res = PQgetResult ( conn - > streamConn ) ;
if ( PQresultStatus ( res ) = = PGRES_COMMAND_OK | |
PQresultStatus ( res ) = = PGRES_COPY_IN )
{
@ -521,16 +553,16 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
PQclear ( res ) ;
ereport ( ERROR ,
( errmsg ( " could not receive data from WAL stream: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
}
}
if ( rawlen < - 1 )
ereport ( ERROR ,
( errmsg ( " could not receive data from WAL stream: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
/* Return received messages to caller */
* buffer = recvBuf ;
* buffer = conn - > recvBuf ;
return rawlen ;
}
@ -540,11 +572,11 @@ libpqrcv_receive(char **buffer, pgsocket *wait_fd)
* ereports on error .
*/
static void
libpqrcv_send ( const char * buffer , int nbytes )
libpqrcv_send ( WalReceiverConn * conn , const char * buffer , int nbytes )
{
if ( PQputCopyData ( streamConn , buffer , nbytes ) < = 0 | |
PQflush ( streamConn ) )
if ( PQputCopyData ( conn - > streamConn , buffer , nbytes ) < = 0 | |
PQflush ( conn - > streamConn ) )
ereport ( ERROR ,
( errmsg ( " could not send data to WAL stream: %s " ,
PQerrorMessage ( streamConn ) ) ) ) ;
PQerrorMessage ( conn - > streamConn ) ) ) ) ;
}