@ -17,11 +17,14 @@
# include "miscadmin.h"
# include "access/htup_details.h"
# include "replication/decode.h"
# include "replication/slot.h"
# include "replication/logical.h"
# include "replication/logicalfuncs.h"
# include "utils/builtins.h"
# include "utils/inval.h"
# include "utils/pg_lsn.h"
# include "utils/resowner.h"
static void
check_permissions ( void )
@ -312,3 +315,200 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
return ( Datum ) 0 ;
}
/*
* Helper function for advancing physical replication slot forward .
*/
static XLogRecPtr
pg_physical_replication_slot_advance ( XLogRecPtr startlsn , XLogRecPtr moveto )
{
XLogRecPtr retlsn = InvalidXLogRecPtr ;
SpinLockAcquire ( & MyReplicationSlot - > mutex ) ;
if ( MyReplicationSlot - > data . restart_lsn < moveto )
{
MyReplicationSlot - > data . restart_lsn = moveto ;
retlsn = moveto ;
}
SpinLockRelease ( & MyReplicationSlot - > mutex ) ;
return retlsn ;
}
/*
* Helper function for advancing logical replication slot forward .
*/
static XLogRecPtr
pg_logical_replication_slot_advance ( XLogRecPtr startlsn , XLogRecPtr moveto )
{
LogicalDecodingContext * ctx ;
ResourceOwner old_resowner = CurrentResourceOwner ;
XLogRecPtr retlsn = InvalidXLogRecPtr ;
PG_TRY ( ) ;
{
/* restart at slot's confirmed_flush */
ctx = CreateDecodingContext ( InvalidXLogRecPtr ,
NIL ,
true ,
logical_read_local_xlog_page ,
NULL , NULL , NULL ) ;
CurrentResourceOwner = ResourceOwnerCreate ( CurrentResourceOwner ,
" logical decoding " ) ;
/* invalidate non-timetravel entries */
InvalidateSystemCaches ( ) ;
/* Decode until we run out of records */
while ( ( startlsn ! = InvalidXLogRecPtr & & startlsn < moveto ) | |
( ctx - > reader - > EndRecPtr ! = InvalidXLogRecPtr & & ctx - > reader - > EndRecPtr < moveto ) )
{
XLogRecord * record ;
char * errm = NULL ;
record = XLogReadRecord ( ctx - > reader , startlsn , & errm ) ;
if ( errm )
elog ( ERROR , " %s " , errm ) ;
/*
* Now that we ' ve set up the xlog reader state , subsequent calls
* pass InvalidXLogRecPtr to say " continue from last record "
*/
startlsn = InvalidXLogRecPtr ;
/*
* The { begin_txn , change , commit_txn } _wrapper callbacks above will
* store the description into our tuplestore .
*/
if ( record ! = NULL )
LogicalDecodingProcessRecord ( ctx , ctx - > reader ) ;
/* check limits */
if ( moveto < = ctx - > reader - > EndRecPtr )
break ;
CHECK_FOR_INTERRUPTS ( ) ;
}
CurrentResourceOwner = old_resowner ;
if ( ctx - > reader - > EndRecPtr ! = InvalidXLogRecPtr )
{
LogicalConfirmReceivedLocation ( moveto ) ;
/*
* If only the confirmed_flush_lsn has changed the slot won ' t get
* marked as dirty by the above . Callers on the walsender
* interface are expected to keep track of their own progress and
* don ' t need it written out . But SQL - interface users cannot
* specify their own start positions and it ' s harder for them to
* keep track of their progress , so we should make more of an
* effort to save it for them .
*
* Dirty the slot so it ' s written out at the next checkpoint .
* We ' ll still lose its position on crash , as documented , but it ' s
* better than always losing the position even on clean restart .
*/
ReplicationSlotMarkDirty ( ) ;
}
retlsn = MyReplicationSlot - > data . confirmed_flush ;
/* free context, call shutdown callback */
FreeDecodingContext ( ctx ) ;
InvalidateSystemCaches ( ) ;
}
PG_CATCH ( ) ;
{
/* clear all timetravel entries */
InvalidateSystemCaches ( ) ;
PG_RE_THROW ( ) ;
}
PG_END_TRY ( ) ;
return retlsn ;
}
/*
* SQL function for moving the position in a replication slot .
*/
Datum
pg_replication_slot_advance ( PG_FUNCTION_ARGS )
{
Name slotname = PG_GETARG_NAME ( 0 ) ;
XLogRecPtr moveto = PG_GETARG_LSN ( 1 ) ;
XLogRecPtr endlsn ;
XLogRecPtr startlsn ;
TupleDesc tupdesc ;
Datum values [ 2 ] ;
bool nulls [ 2 ] ;
HeapTuple tuple ;
Datum result ;
Assert ( ! MyReplicationSlot ) ;
check_permissions ( ) ;
if ( XLogRecPtrIsInvalid ( moveto ) )
ereport ( ERROR ,
( errmsg ( " invalid target wal lsn " ) ) ) ;
/* Build a tuple descriptor for our result type */
if ( get_call_result_type ( fcinfo , NULL , & tupdesc ) ! = TYPEFUNC_COMPOSITE )
elog ( ERROR , " return type must be a row type " ) ;
/*
* We can ' t move slot past what ' s been flushed / replayed so clamp the
* target possition accordingly .
*/
if ( ! RecoveryInProgress ( ) )
moveto = Min ( moveto , GetFlushRecPtr ( ) ) ;
else
moveto = Min ( moveto , GetXLogReplayRecPtr ( & ThisTimeLineID ) ) ;
/* Acquire the slot so we "own" it */
ReplicationSlotAcquire ( NameStr ( * slotname ) , true ) ;
startlsn = MyReplicationSlot - > data . confirmed_flush ;
if ( moveto < startlsn )
{
ReplicationSlotRelease ( ) ;
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " cannot move slot to %X/%X, minimum is %X/%X " ,
( uint32 ) ( moveto > > 32 ) , ( uint32 ) moveto ,
( uint32 ) ( MyReplicationSlot - > data . confirmed_flush > > 32 ) ,
( uint32 ) ( MyReplicationSlot - > data . confirmed_flush ) ) ) ) ;
}
if ( OidIsValid ( MyReplicationSlot - > data . database ) )
endlsn = pg_logical_replication_slot_advance ( startlsn , moveto ) ;
else
endlsn = pg_physical_replication_slot_advance ( startlsn , moveto ) ;
values [ 0 ] = NameGetDatum ( & MyReplicationSlot - > data . name ) ;
nulls [ 0 ] = false ;
/* Update the on disk state when lsn was updated. */
if ( XLogRecPtrIsInvalid ( endlsn ) )
{
ReplicationSlotMarkDirty ( ) ;
ReplicationSlotsComputeRequiredXmin ( false ) ;
ReplicationSlotsComputeRequiredLSN ( ) ;
ReplicationSlotSave ( ) ;
}
ReplicationSlotRelease ( ) ;
/* Return the reached position. */
values [ 1 ] = LSNGetDatum ( endlsn ) ;
nulls [ 1 ] = false ;
tuple = heap_form_tuple ( tupdesc , values , nulls ) ;
result = HeapTupleGetDatum ( tuple ) ;
PG_RETURN_DATUM ( result ) ;
}