@ -232,6 +232,7 @@ static void XLogSendLogical(void);
static void WalSndDone ( WalSndSendDataCallback send_data ) ;
static XLogRecPtr GetStandbyFlushRecPtr ( void ) ;
static void IdentifySystem ( void ) ;
static void ReadReplicationSlot ( ReadReplicationSlotCmd * cmd ) ;
static void CreateReplicationSlot ( CreateReplicationSlotCmd * cmd ) ;
static void DropReplicationSlot ( DropReplicationSlotCmd * cmd ) ;
static void StartReplication ( StartReplicationCmd * cmd ) ;
@ -457,6 +458,104 @@ IdentifySystem(void)
end_tup_output ( tstate ) ;
}
/* Handle READ_REPLICATION_SLOT command */
static void
ReadReplicationSlot ( ReadReplicationSlotCmd * cmd )
{
# define READ_REPLICATION_SLOT_COLS 3
ReplicationSlot * slot ;
DestReceiver * dest ;
TupOutputState * tstate ;
TupleDesc tupdesc ;
Datum values [ READ_REPLICATION_SLOT_COLS ] ;
bool nulls [ READ_REPLICATION_SLOT_COLS ] ;
tupdesc = CreateTemplateTupleDesc ( READ_REPLICATION_SLOT_COLS ) ;
TupleDescInitBuiltinEntry ( tupdesc , ( AttrNumber ) 1 , " slot_type " ,
TEXTOID , - 1 , 0 ) ;
TupleDescInitBuiltinEntry ( tupdesc , ( AttrNumber ) 2 , " restart_lsn " ,
TEXTOID , - 1 , 0 ) ;
/* TimeLineID is unsigned, so int4 is not wide enough. */
TupleDescInitBuiltinEntry ( tupdesc , ( AttrNumber ) 3 , " restart_tli " ,
INT8OID , - 1 , 0 ) ;
MemSet ( values , 0 , READ_REPLICATION_SLOT_COLS * sizeof ( Datum ) ) ;
MemSet ( nulls , true , READ_REPLICATION_SLOT_COLS * sizeof ( bool ) ) ;
LWLockAcquire ( ReplicationSlotControlLock , LW_SHARED ) ;
slot = SearchNamedReplicationSlot ( cmd - > slotname , false ) ;
if ( slot = = NULL | | ! slot - > in_use )
{
LWLockRelease ( ReplicationSlotControlLock ) ;
}
else
{
ReplicationSlot slot_contents ;
int i = 0 ;
/* Copy slot contents while holding spinlock */
SpinLockAcquire ( & slot - > mutex ) ;
slot_contents = * slot ;
SpinLockRelease ( & slot - > mutex ) ;
LWLockRelease ( ReplicationSlotControlLock ) ;
if ( OidIsValid ( slot_contents . data . database ) )
ereport ( ERROR ,
errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " cannot use \" %s \" with logical replication slot \" %s \" " ,
" READ_REPLICATION_SLOT " ,
NameStr ( slot_contents . data . name ) ) ) ;
/* slot type */
values [ i ] = CStringGetTextDatum ( " physical " ) ;
nulls [ i ] = false ;
i + + ;
/* start LSN */
if ( ! XLogRecPtrIsInvalid ( slot_contents . data . restart_lsn ) )
{
char xloc [ 64 ] ;
snprintf ( xloc , sizeof ( xloc ) , " %X/%X " ,
LSN_FORMAT_ARGS ( slot_contents . data . restart_lsn ) ) ;
values [ i ] = CStringGetTextDatum ( xloc ) ;
nulls [ i ] = false ;
}
i + + ;
/* timeline this WAL was produced on */
if ( ! XLogRecPtrIsInvalid ( slot_contents . data . restart_lsn ) )
{
TimeLineID slots_position_timeline ;
TimeLineID current_timeline ;
List * timeline_history = NIL ;
/*
* While in recovery , use as timeline the currently - replaying one
* to get the LSN position ' s history .
*/
if ( RecoveryInProgress ( ) )
( void ) GetXLogReplayRecPtr ( & current_timeline ) ;
else
current_timeline = ThisTimeLineID ;
timeline_history = readTimeLineHistory ( current_timeline ) ;
slots_position_timeline = tliOfPointInHistory ( slot_contents . data . restart_lsn ,
timeline_history ) ;
values [ i ] = Int64GetDatum ( ( int64 ) slots_position_timeline ) ;
nulls [ i ] = false ;
}
i + + ;
Assert ( i = = READ_REPLICATION_SLOT_COLS ) ;
}
dest = CreateDestReceiver ( DestRemoteSimple ) ;
tstate = begin_tup_output_tupdesc ( dest , tupdesc , & TTSOpsVirtual ) ;
do_tup_output ( tstate , values , nulls ) ;
end_tup_output ( tstate ) ;
}
/*
* Handle TIMELINE_HISTORY command .
@ -1622,6 +1721,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand ( cmdtag ) ;
break ;
case T_ReadReplicationSlotCmd :
cmdtag = " READ_REPLICATION_SLOT " ;
set_ps_display ( cmdtag ) ;
ReadReplicationSlot ( ( ReadReplicationSlotCmd * ) cmd_node ) ;
EndReplicationCommand ( cmdtag ) ;
break ;
case T_BaseBackupCmd :
cmdtag = " BASE_BACKUP " ;
set_ps_display ( cmdtag ) ;