@ -1126,12 +1126,13 @@ static void
parseCreateReplSlotOptions ( CreateReplicationSlotCmd * cmd ,
parseCreateReplSlotOptions ( CreateReplicationSlotCmd * cmd ,
bool * reserve_wal ,
bool * reserve_wal ,
CRSSnapshotAction * snapshot_action ,
CRSSnapshotAction * snapshot_action ,
bool * two_phase )
bool * two_phase , bool * failover )
{
{
ListCell * lc ;
ListCell * lc ;
bool snapshot_action_given = false ;
bool snapshot_action_given = false ;
bool reserve_wal_given = false ;
bool reserve_wal_given = false ;
bool two_phase_given = false ;
bool two_phase_given = false ;
bool failover_given = false ;
/* Parse options */
/* Parse options */
foreach ( lc , cmd - > options )
foreach ( lc , cmd - > options )
@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
two_phase_given = true ;
two_phase_given = true ;
* two_phase = defGetBoolean ( defel ) ;
* two_phase = defGetBoolean ( defel ) ;
}
}
else if ( strcmp ( defel - > defname , " failover " ) = = 0 )
{
if ( failover_given | | cmd - > kind ! = REPLICATION_KIND_LOGICAL )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
failover_given = true ;
* failover = defGetBoolean ( defel ) ;
}
else
else
elog ( ERROR , " unrecognized option: %s " , defel - > defname ) ;
elog ( ERROR , " unrecognized option: %s " , defel - > defname ) ;
}
}
@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
char * slot_name ;
char * slot_name ;
bool reserve_wal = false ;
bool reserve_wal = false ;
bool two_phase = false ;
bool two_phase = false ;
bool failover = false ;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT ;
CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT ;
DestReceiver * dest ;
DestReceiver * dest ;
TupOutputState * tstate ;
TupOutputState * tstate ;
@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Assert ( ! MyReplicationSlot ) ;
Assert ( ! MyReplicationSlot ) ;
parseCreateReplSlotOptions ( cmd , & reserve_wal , & snapshot_action , & two_phase ) ;
parseCreateReplSlotOptions ( cmd , & reserve_wal , & snapshot_action , & two_phase ,
& failover ) ;
if ( cmd - > kind = = REPLICATION_KIND_PHYSICAL )
if ( cmd - > kind = = REPLICATION_KIND_PHYSICAL )
{
{
@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
*/
ReplicationSlotCreate ( cmd - > slotname , true ,
ReplicationSlotCreate ( cmd - > slotname , true ,
cmd - > temporary ? RS_TEMPORARY : RS_EPHEMERAL ,
cmd - > temporary ? RS_TEMPORARY : RS_EPHEMERAL ,
two_phase , false ) ;
two_phase , failover ) ;
/*
/*
* Do options check early so that we can bail before calling the
* Do options check early so that we can bail before calling the
@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
ReplicationSlotDrop ( cmd - > slotname , ! cmd - > wait ) ;
ReplicationSlotDrop ( cmd - > slotname , ! cmd - > wait ) ;
}
}
/*
* Process extra options given to ALTER_REPLICATION_SLOT .
*/
static void
ParseAlterReplSlotOptions ( AlterReplicationSlotCmd * cmd , bool * failover )
{
bool failover_given = false ;
/* Parse options */
foreach_ptr ( DefElem , defel , cmd - > options )
{
if ( strcmp ( defel - > defname , " failover " ) = = 0 )
{
if ( failover_given )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
failover_given = true ;
* failover = defGetBoolean ( defel ) ;
}
else
elog ( ERROR , " unrecognized option: %s " , defel - > defname ) ;
}
}
/*
* Change the definition of a replication slot .
*/
static void
AlterReplicationSlot ( AlterReplicationSlotCmd * cmd )
{
bool failover = false ;
ParseAlterReplSlotOptions ( cmd , & failover ) ;
ReplicationSlotAlter ( cmd - > slotname , failover ) ;
}
/*
/*
* Load previously initiated logical slot and prepare for sending data ( via
* Load previously initiated logical slot and prepare for sending data ( via
* WalSndLoop ) .
* WalSndLoop ) .
@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
EndReplicationCommand ( cmdtag ) ;
EndReplicationCommand ( cmdtag ) ;
break ;
break ;
case T_AlterReplicationSlotCmd :
cmdtag = " ALTER_REPLICATION_SLOT " ;
set_ps_display ( cmdtag ) ;
AlterReplicationSlot ( ( AlterReplicationSlotCmd * ) cmd_node ) ;
EndReplicationCommand ( cmdtag ) ;
break ;
case T_StartReplicationCmd :
case T_StartReplicationCmd :
{
{
StartReplicationCmd * cmd = ( StartReplicationCmd * ) cmd_node ;
StartReplicationCmd * cmd = ( StartReplicationCmd * ) cmd_node ;