@ -27,6 +27,7 @@
# include "receivelog.h"
# include "streamutil.h"
# include "pqexpbuffer.h"
# include "common/fe_memutils.h"
# include "datatype/timestamp.h"
@ -227,11 +228,183 @@ GetConnection(void)
return tmpconn ;
}
/*
* Run IDENTIFY_SYSTEM through a given connection and give back to caller
* some result information if requested :
* - Start LSN position
* - Current timeline ID
* - System identifier
* - Plugin name
*/
bool
RunIdentifySystem ( PGconn * conn , char * * sysid , TimeLineID * starttli ,
XLogRecPtr * startpos , char * * db_name )
{
PGresult * res ;
uint32 hi , lo ;
/* Check connection existence */
Assert ( conn ! = NULL ) ;
res = PQexec ( conn , " IDENTIFY_SYSTEM " ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
{
fprintf ( stderr , _ ( " %s: could not send replication command \" %s \" : %s " ) ,
progname , " IDENTIFY_SYSTEM " , PQerrorMessage ( conn ) ) ;
return false ;
}
if ( PQntuples ( res ) ! = 1 | | PQnfields ( res ) < 3 )
{
fprintf ( stderr ,
_ ( " %s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields \n " ) ,
progname , PQntuples ( res ) , PQnfields ( res ) , 1 , 3 ) ;
return false ;
}
/* Get system identifier */
if ( sysid ! = NULL )
* sysid = pg_strdup ( PQgetvalue ( res , 0 , 0 ) ) ;
/* Get timeline ID to start streaming from */
if ( starttli ! = NULL )
* starttli = atoi ( PQgetvalue ( res , 0 , 1 ) ) ;
/* Get LSN start position if necessary */
if ( startpos ! = NULL )
{
if ( sscanf ( PQgetvalue ( res , 0 , 2 ) , " %X/%X " , & hi , & lo ) ! = 2 )
{
fprintf ( stderr ,
_ ( " %s: could not parse transaction log location \" %s \" \n " ) ,
progname , PQgetvalue ( res , 0 , 2 ) ) ;
return false ;
}
* startpos = ( ( uint64 ) hi ) < < 32 | lo ;
}
/* Get database name, only available in 9.4 and newer versions */
if ( db_name ! = NULL )
{
if ( PQnfields ( res ) < 4 )
fprintf ( stderr ,
_ ( " %s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields \n " ) ,
progname , PQntuples ( res ) , PQnfields ( res ) , 1 , 4 ) ;
if ( PQgetisnull ( res , 0 , 3 ) )
* db_name = NULL ;
else
* db_name = pg_strdup ( PQgetvalue ( res , 0 , 3 ) ) ;
}
PQclear ( res ) ;
return true ;
}
/*
* Create a replication slot for the given connection . This function
* returns true in case of success as well as the start position
* obtained after the slot creation .
*/
bool
CreateReplicationSlot ( PGconn * conn , const char * slot_name , const char * plugin ,
XLogRecPtr * startpos , bool is_physical )
{
PQExpBuffer query ;
PGresult * res ;
query = createPQExpBuffer ( ) ;
Assert ( ( is_physical & & plugin = = NULL ) | |
( ! is_physical & & plugin ! = NULL ) ) ;
Assert ( slot_name ! = NULL ) ;
/* Build query */
if ( is_physical )
appendPQExpBuffer ( query , " CREATE_REPLICATION_SLOT \" %s \" PHYSICAL " ,
slot_name ) ;
else
appendPQExpBuffer ( query , " CREATE_REPLICATION_SLOT \" %s \" LOGICAL \" %s \" " ,
slot_name , plugin ) ;
res = PQexec ( conn , query - > data ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
{
fprintf ( stderr , _ ( " %s: could not send replication command \" %s \" : %s " ) ,
progname , query - > data , PQerrorMessage ( conn ) ) ;
return false ;
}
if ( PQntuples ( res ) ! = 1 | | PQnfields ( res ) ! = 4 )
{
fprintf ( stderr ,
_ ( " %s: could not create replication slot \" %s \" : got %d rows and %d fields, expected %d rows and %d fields \n " ) ,
progname , slot_name ,
PQntuples ( res ) , PQnfields ( res ) , 1 , 4 ) ;
return false ;
}
/* Get LSN start position if necessary */
if ( startpos ! = NULL )
{
uint32 hi , lo ;
if ( sscanf ( PQgetvalue ( res , 0 , 1 ) , " %X/%X " , & hi , & lo ) ! = 2 )
{
fprintf ( stderr ,
_ ( " %s: could not parse transaction log location \" %s \" \n " ) ,
progname , PQgetvalue ( res , 0 , 1 ) ) ;
return false ;
}
* startpos = ( ( uint64 ) hi ) < < 32 | lo ;
}
PQclear ( res ) ;
return true ;
}
/*
* Drop a replication slot for the given connection . This function
* returns true in case of success .
*/
bool
DropReplicationSlot ( PGconn * conn , const char * slot_name )
{
PQExpBuffer query ;
PGresult * res ;
Assert ( slot_name ! = NULL ) ;
query = createPQExpBuffer ( ) ;
/* Build query */
appendPQExpBuffer ( query , " DROP_REPLICATION_SLOT \" %s \" " ,
slot_name ) ;
res = PQexec ( conn , query - > data ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , _ ( " %s: could not send replication command \" %s \" : %s " ) ,
progname , query - > data , PQerrorMessage ( conn ) ) ;
return false ;
}
if ( PQntuples ( res ) ! = 0 | | PQnfields ( res ) ! = 0 )
{
fprintf ( stderr ,
_ ( " %s: could not drop replication slot \" %s \" : got %d rows and %d fields, expected %d rows and %d fields \n " ) ,
progname , slot_name ,
PQntuples ( res ) , PQnfields ( res ) , 0 , 0 ) ;
return false ;
}
PQclear ( res ) ;
return true ;
}
/*
* Frontend version of GetCurrentTimestamp ( ) , since we are not linked with
* backend code . The protocol always uses integer timestamps , regardless of
* server setting .
* backend code . The replication protocol always uses integer timestamps ,
* regardless of the server setting .
*/
int64
feGetCurrentTimestamp ( void )