@ -410,7 +410,7 @@ retry:
worker - > relstate = SUBREL_STATE_UNKNOWN ;
worker - > relstate = SUBREL_STATE_UNKNOWN ;
worker - > relstate_lsn = InvalidXLogRecPtr ;
worker - > relstate_lsn = InvalidXLogRecPtr ;
worker - > stream_fileset = NULL ;
worker - > stream_fileset = NULL ;
worker - > apply_ leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid ;
worker - > leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid ;
worker - > parallel_apply = is_parallel_apply_worker ;
worker - > parallel_apply = is_parallel_apply_worker ;
worker - > last_lsn = InvalidXLogRecPtr ;
worker - > last_lsn = InvalidXLogRecPtr ;
TIMESTAMP_NOBEGIN ( worker - > last_send_time ) ;
TIMESTAMP_NOBEGIN ( worker - > last_send_time ) ;
@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker - > userid = InvalidOid ;
worker - > userid = InvalidOid ;
worker - > subid = InvalidOid ;
worker - > subid = InvalidOid ;
worker - > relid = InvalidOid ;
worker - > relid = InvalidOid ;
worker - > apply_ leader_pid = InvalidPid ;
worker - > leader_pid = InvalidPid ;
worker - > parallel_apply = false ;
worker - > parallel_apply = false ;
}
}
@ -1066,13 +1066,41 @@ IsLogicalLauncher(void)
return LogicalRepCtx - > launcher_pid = = MyProcPid ;
return LogicalRepCtx - > launcher_pid = = MyProcPid ;
}
}
/*
* Return the pid of the leader apply worker if the given pid is the pid of a
* parallel apply worker , otherwise , return InvalidPid .
*/
pid_t
GetLeaderApplyWorkerPid ( pid_t pid )
{
int leader_pid = InvalidPid ;
int i ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
for ( i = 0 ; i < max_logical_replication_workers ; i + + )
{
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
if ( isParallelApplyWorker ( w ) & & w - > proc & & pid = = w - > proc - > pid )
{
leader_pid = w - > leader_pid ;
break ;
}
}
LWLockRelease ( LogicalRepWorkerLock ) ;
return leader_pid ;
}
/*
/*
* Returns state of the subscriptions .
* Returns state of the subscriptions .
*/
*/
Datum
Datum
pg_stat_get_subscription ( PG_FUNCTION_ARGS )
pg_stat_get_subscription ( PG_FUNCTION_ARGS )
{
{
# define PG_STAT_GET_SUBSCRIPTION_COLS 8
# define PG_STAT_GET_SUBSCRIPTION_COLS 9
Oid subid = PG_ARGISNULL ( 0 ) ? InvalidOid : PG_GETARG_OID ( 0 ) ;
Oid subid = PG_ARGISNULL ( 0 ) ? InvalidOid : PG_GETARG_OID ( 0 ) ;
int i ;
int i ;
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if ( OidIsValid ( subid ) & & worker . subid ! = subid )
if ( OidIsValid ( subid ) & & worker . subid ! = subid )
continue ;
continue ;
/* Skip if this is a parallel apply worker */
if ( isParallelApplyWorker ( & worker ) )
continue ;
worker_pid = worker . proc - > pid ;
worker_pid = worker . proc - > pid ;
values [ 0 ] = ObjectIdGetDatum ( worker . subid ) ;
values [ 0 ] = ObjectIdGetDatum ( worker . subid ) ;
@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
else
else
nulls [ 1 ] = true ;
nulls [ 1 ] = true ;
values [ 2 ] = Int32GetDatum ( worker_pid ) ;
values [ 2 ] = Int32GetDatum ( worker_pid ) ;
if ( XLogRecPtrIsInvalid ( worker . last_lsn ) )
if ( isParallelApplyWorker ( & worker ) )
values [ 3 ] = Int32GetDatum ( worker . leader_pid ) ;
else
nulls [ 3 ] = true ;
nulls [ 3 ] = true ;
if ( XLogRecPtrIsInvalid ( worker . last_lsn ) )
nulls [ 4 ] = true ;
else
else
values [ 3 ] = LSNGetDatum ( worker . last_lsn ) ;
values [ 4 ] = LSNGetDatum ( worker . last_lsn ) ;
if ( worker . last_send_time = = 0 )
if ( worker . last_send_time = = 0 )
nulls [ 4 ] = true ;
nulls [ 5 ] = true ;
else
else
values [ 4 ] = TimestampTzGetDatum ( worker . last_send_time ) ;
values [ 5 ] = TimestampTzGetDatum ( worker . last_send_time ) ;
if ( worker . last_recv_time = = 0 )
if ( worker . last_recv_time = = 0 )
nulls [ 5 ] = true ;
nulls [ 6 ] = true ;
else
else
values [ 5 ] = TimestampTzGetDatum ( worker . last_recv_time ) ;
values [ 6 ] = TimestampTzGetDatum ( worker . last_recv_time ) ;
if ( XLogRecPtrIsInvalid ( worker . reply_lsn ) )
if ( XLogRecPtrIsInvalid ( worker . reply_lsn ) )
nulls [ 6 ] = true ;
nulls [ 7 ] = true ;
else
else
values [ 6 ] = LSNGetDatum ( worker . reply_lsn ) ;
values [ 7 ] = LSNGetDatum ( worker . reply_lsn ) ;
if ( worker . reply_time = = 0 )
if ( worker . reply_time = = 0 )
nulls [ 7 ] = true ;
nulls [ 8 ] = true ;
else
else
values [ 7 ] = TimestampTzGetDatum ( worker . reply_time ) ;
values [ 8 ] = TimestampTzGetDatum ( worker . reply_time ) ;
tuplestore_putvalues ( rsinfo - > setResult , rsinfo - > setDesc ,
tuplestore_putvalues ( rsinfo - > setResult , rsinfo - > setDesc ,
values , nulls ) ;
values , nulls ) ;