@ -22,6 +22,7 @@
# include "miscadmin.h"
# include "access/xlog_internal.h"
# include "access/xlogutils.h"
# include "catalog/pg_type.h"
@ -100,108 +101,6 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
p - > returned_rows + + ;
}
/*
* TODO : This is duplicate code with pg_xlogdump , similar to walsender . c , but
* we currently don ' t have the infrastructure ( elog ! ) to share it .
*/
static void
XLogRead ( char * buf , TimeLineID tli , XLogRecPtr startptr , Size count )
{
char * p ;
XLogRecPtr recptr ;
Size nbytes ;
static int sendFile = - 1 ;
static XLogSegNo sendSegNo = 0 ;
static uint32 sendOff = 0 ;
p = buf ;
recptr = startptr ;
nbytes = count ;
while ( nbytes > 0 )
{
uint32 startoff ;
int segbytes ;
int readbytes ;
startoff = recptr % XLogSegSize ;
if ( sendFile < 0 | | ! XLByteInSeg ( recptr , sendSegNo ) )
{
char path [ MAXPGPATH ] ;
/* Switch to another logfile segment */
if ( sendFile > = 0 )
close ( sendFile ) ;
XLByteToSeg ( recptr , sendSegNo ) ;
XLogFilePath ( path , tli , sendSegNo ) ;
sendFile = BasicOpenFile ( path , O_RDONLY | PG_BINARY , 0 ) ;
if ( sendFile < 0 )
{
if ( errno = = ENOENT )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " requested WAL segment %s has already been removed " ,
path ) ) ) ;
else
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not open file \" %s \" : %m " ,
path ) ) ) ;
}
sendOff = 0 ;
}
/* Need to seek in the file? */
if ( sendOff ! = startoff )
{
if ( lseek ( sendFile , ( off_t ) startoff , SEEK_SET ) < 0 )
{
char path [ MAXPGPATH ] ;
XLogFilePath ( path , tli , sendSegNo ) ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not seek in log segment %s to offset %u: %m " ,
path , startoff ) ) ) ;
}
sendOff = startoff ;
}
/* How many bytes are within this segment? */
if ( nbytes > ( XLogSegSize - startoff ) )
segbytes = XLogSegSize - startoff ;
else
segbytes = nbytes ;
readbytes = read ( sendFile , p , segbytes ) ;
if ( readbytes < = 0 )
{
char path [ MAXPGPATH ] ;
XLogFilePath ( path , tli , sendSegNo ) ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read from log segment %s, offset %u, length %lu: %m " ,
path , sendOff , ( unsigned long ) segbytes ) ) ) ;
}
/* Update state for read */
recptr + = readbytes ;
sendOff + = readbytes ;
nbytes - = readbytes ;
p + = readbytes ;
}
}
static void
check_permissions ( void )
{
@ -211,63 +110,12 @@ check_permissions(void)
( errmsg ( " must be superuser or replication role to use replication slots " ) ) ) ) ;
}
/*
* read_page callback for logical decoding contexts .
*
* Public because it would likely be very helpful for someone writing another
* output method outside walsender , e . g . in a bgworker .
*
* TODO : The walsender has it ' s own version of this , but it relies on the
* walsender ' s latch being set whenever WAL is flushed . No such infrastructure
* exists for normal backends , so we have to do a check / sleep / repeat style of
* loop for now .
*/
int
logical_read_local_xlog_page ( XLogReaderState * state , XLogRecPtr targetPagePtr ,
int reqLen , XLogRecPtr targetRecPtr , char * cur_page , TimeLineID * pageTLI )
{
XLogRecPtr flushptr ,
loc ;
int count ;
loc = targetPagePtr + reqLen ;
while ( 1 )
{
/*
* TODO : we ' re going to have to do something more intelligent about
* timelines on standbys . Use readTimeLineHistory ( ) and
* tliOfPointInHistory ( ) to get the proper LSN ? For now we ' ll catch
* that case earlier , but the code and TODO is left in here for when
* that changes .
*/
if ( ! RecoveryInProgress ( ) )
{
* pageTLI = ThisTimeLineID ;
flushptr = GetFlushRecPtr ( ) ;
}
else
flushptr = GetXLogReplayRecPtr ( pageTLI ) ;
if ( loc < = flushptr )
break ;
CHECK_FOR_INTERRUPTS ( ) ;
pg_usleep ( 1000L ) ;
}
/* more than one block available */
if ( targetPagePtr + XLOG_BLCKSZ < = flushptr )
count = XLOG_BLCKSZ ;
/* not enough data there */
else if ( targetPagePtr + reqLen > flushptr )
return - 1 ;
/* part of the page available */
else
count = flushptr - targetPagePtr ;
XLogRead ( cur_page , * pageTLI , targetPagePtr , XLOG_BLCKSZ ) ;
return count ;
return read_local_xlog_page ( state , targetPagePtr , reqLen ,
targetRecPtr , cur_page , pageTLI ) ;
}
/*