@ -8,6 +8,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/user"
"golang.org/x/exp/slices"
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
@ -40,28 +42,32 @@ type responseFromIngesters struct {
// IngesterQuerier helps with querying the ingesters.
type IngesterQuerier struct {
ring ring . ReadRing
pool * ring_client . Pool
extraQueryDelay time . Duration
logger log . Logger
querierConfig Config
ring ring . ReadRing
partitionRing * ring . PartitionInstanceRing
getShardCountForTenant func ( string ) int
pool * ring_client . Pool
logger log . Logger
}
func NewIngesterQuerier ( clientCfg client . Config , ring ring . ReadRing , extraQueryDelay time . Duration , metricsNamespace string , logger log . Logger ) ( * IngesterQuerier , error ) {
func NewIngesterQuerier ( querierConfig Config , clientCfg client . Config , ring ring . ReadRing , partitionRing * ring . PartitionInstanceRing , getShardCountForTenant func ( string ) int , metricsNamespace string , logger log . Logger ) ( * IngesterQuerier , error ) {
factory := func ( addr string ) ( ring_client . PoolClient , error ) {
return client . New ( clientCfg , addr )
}
return newIngesterQuerier ( clientCfg , ring , extraQueryDelay , ring_client . PoolAddrFunc ( factory ) , metricsNamespace , logger )
return newIngesterQuerier ( querierConfig , clientCfg , ring , partitionRing , getShardCountForTenant , ring_client . PoolAddrFunc ( factory ) , metricsNamespace , logger )
}
// newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory
// used for testing purposes
func newIngesterQuerier ( clientCfg client . Config , ring ring . ReadRing , extraQueryDelay time . Duration , clientFactory ring_client . PoolFactory , metricsNamespace string , logger log . Logger ) ( * IngesterQuerier , error ) {
func newIngesterQuerier ( querierConfig Config , clientCfg client . Config , ring ring . ReadRing , partitionRing * ring . PartitionInstanceRing , getShardCountForTenant func ( string ) int , clientFactory ring_client . PoolFactory , metricsNamespace string , logger log . Logger ) ( * IngesterQuerier , error ) {
iq := IngesterQuerier {
ring : ring ,
pool : clientpool . NewPool ( "ingester" , clientCfg . PoolConfig , ring , clientFactory , util_log . Logger , metricsNamespace ) ,
extraQueryDelay : extraQueryDelay ,
logger : logger ,
querierConfig : querierConfig ,
ring : ring ,
partitionRing : partitionRing ,
getShardCountForTenant : getShardCountForTenant , // limits?
pool : clientpool . NewPool ( "ingester" , clientCfg . PoolConfig , ring , clientFactory , util_log . Logger , metricsNamespace ) ,
logger : logger ,
}
err := services . StartAndAwaitRunning ( context . Background ( ) , iq . pool )
@ -73,22 +79,53 @@ func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryD
}
// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func ( q * IngesterQuerier ) forAllIngesters ( ctx context . Context , f func ( context . Context , logproto . QuerierClient ) ( interface { } , error ) ) ( [ ] responseFromIngesters , error ) {
if q . querierConfig . QueryPartitionIngesters {
tenantID , err := user . ExtractOrgID ( ctx )
if err != nil {
return nil , err
}
tenantShards := q . getShardCountForTenant ( tenantID )
subring , err := q . partitionRing . ShuffleShardWithLookback ( tenantID , tenantShards , q . querierConfig . QueryIngestersWithin , time . Now ( ) )
if err != nil {
return nil , err
}
replicationSets , err := subring . GetReplicationSetsForOperation ( ring . Read )
if err != nil {
return nil , err
}
return q . forGivenIngesterSets ( ctx , replicationSets , f )
}
replicationSet , err := q . ring . GetReplicationSetForOperation ( ring . Read )
if err != nil {
return nil , err
}
return q . forGivenIngesters ( ctx , replicationSet , f )
return q . forGivenIngesters ( ctx , replicationSet , defaultQuorumConfig ( ) , f )
}
// forGivenIngesters runs f, in parallel, for given ingesters
func ( q * IngesterQuerier ) forGivenIngesters ( ctx context . Context , replicationSet ring . ReplicationSet , f func ( context . Context , logproto . QuerierClient ) ( interface { } , error ) ) ( [ ] responseFromIngesters , error ) {
cfg := ring . DoUntilQuorumConfig {
// forGivenIngesterSets runs f, in parallel, for given ingester sets
func ( q * IngesterQuerier ) forGivenIngesterSets ( ctx context . Context , replicationSet [ ] ring . ReplicationSet , f func ( context . Context , logproto . QuerierClient ) ( interface { } , error ) ) ( [ ] responseFromIngesters , error ) {
// Enable minimize requests so we initially query a single ingester per replication set, as each replication-set is one partition.
// Ingesters must supply zone information for this to have an effect.
config := ring . DoUntilQuorumConfig {
MinimizeRequests : true ,
}
return concurrency . ForEachJobMergeResults [ ring . ReplicationSet , responseFromIngesters ] ( ctx , replicationSet , 0 , func ( ctx context . Context , set ring . ReplicationSet ) ( [ ] responseFromIngesters , error ) {
return q . forGivenIngesters ( ctx , set , config , f )
} )
}
func defaultQuorumConfig ( ) ring . DoUntilQuorumConfig {
return ring . DoUntilQuorumConfig {
// Nothing here
}
results , err := ring . DoUntilQuorum ( ctx , replicationSet , cfg , func ( ctx context . Context , ingester * ring . InstanceDesc ) ( responseFromIngesters , error ) {
}
// forGivenIngesters runs f, in parallel, for given ingesters
func ( q * IngesterQuerier ) forGivenIngesters ( ctx context . Context , replicationSet ring . ReplicationSet , quorumConfig ring . DoUntilQuorumConfig , f func ( context . Context , logproto . QuerierClient ) ( interface { } , error ) ) ( [ ] responseFromIngesters , error ) {
results , err := ring . DoUntilQuorum ( ctx , replicationSet , quorumConfig , func ( ctx context . Context , ingester * ring . InstanceDesc ) ( responseFromIngesters , error ) {
client , err := q . pool . GetClientFor ( ingester . Addr )
if err != nil {
return responseFromIngesters { addr : ingester . Addr } , err
@ -212,7 +249,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}
// Instance a tail client for each ingester to re(connect)
reconnectClients , err := q . forGivenIngesters ( ctx , ring . ReplicationSet { Instances : reconnectIngesters } , func ( _ context . Context , client logproto . QuerierClient ) ( interface { } , error ) {
reconnectClients , err := q . forGivenIngesters ( ctx , ring . ReplicationSet { Instances : reconnectIngesters } , defaultQuorumConfig ( ) , func ( _ context . Context , client logproto . QuerierClient ) ( interface { } , error ) {
return client . Tail ( ctx , req )
} )
if err != nil {
@ -260,7 +297,7 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
return nil , httpgrpc . Errorf ( http . StatusInternalServerError , "no active ingester found" )
}
responses , err := q . forGivenIngesters ( ctx , replicationSet , func ( ctx context . Context , querierClient logproto . QuerierClient ) ( interface { } , error ) {
responses , err := q . forGivenIngesters ( ctx , replicationSet , defaultQuorumConfig ( ) , func ( ctx context . Context , querierClient logproto . QuerierClient ) ( interface { } , error ) {
resp , err := querierClient . TailersCount ( ctx , & logproto . TailersCountRequest { } )
if err != nil {
return nil , err