@ -1,4 +1,4 @@
package gatewayclient
package index gateway
import (
"context"
@ -30,7 +30,6 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/indexgateway"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/discovery"
@ -42,12 +41,12 @@ const (
maxConcurrentGrpcCalls = 10
)
// IndexGateway ClientConfig configures the Index Gateway client used to
// communicate with the Index Gateway server.
type IndexGateway ClientConfig struct {
// ClientConfig configures the Index Gateway client used to communicate with
// the Index Gateway server.
type ClientConfig struct {
// Mode sets in which mode the client will operate. It is actually defined at the
// index_gateway YAML section and reused here.
Mode indexgateway . Mode ` yaml:"-" `
Mode Mode ` yaml:"-" `
// PoolConfig defines the behavior of the gRPC connection pool used to communicate
// with the Index Gateway.
@ -87,39 +86,32 @@ type IndexGatewayClientConfig struct {
// RegisterFlagsWithPrefix register client-specific flags with the given prefix.
//
// Flags that are used by both, client and server, are defined in the indexgateway package.
func ( i * IndexGateway ClientConfig) RegisterFlagsWithPrefix ( prefix string , f * flag . FlagSet ) {
func ( i * ClientConfig ) RegisterFlagsWithPrefix ( prefix string , f * flag . FlagSet ) {
i . GRPCClientConfig . RegisterFlagsWithPrefix ( prefix + ".grpc" , f )
f . StringVar ( & i . Address , prefix + ".server-address" , "" , "Hostname or IP of the Index Gateway gRPC server running in simple mode. Can also be prefixed with dns+, dnssrv+, or dnssrvnoa+ to resolve a DNS A record with multiple IP's, a DNS SRV record with a followup A record lookup, or a DNS SRV record without a followup A record lookup, respectively." )
f . BoolVar ( & i . LogGatewayRequests , prefix + ".log-gateway-requests" , false , "Whether requests sent to the gateway should be logged or not." )
}
func ( i * IndexGateway ClientConfig) RegisterFlags ( f * flag . FlagSet ) {
func ( i * ClientConfig ) RegisterFlags ( f * flag . FlagSet ) {
i . RegisterFlagsWithPrefix ( "index-gateway-client" , f )
}
type GatewayClient struct {
logger log . Logger
cfg IndexGatewayClientConfig
logger log . Logger
cfg ClientConfig
storeGatewayClientRequestDuration * prometheus . HistogramVec
dnsProvider * discovery . DNS
pool * client . Pool
ring ring . ReadRing
limits indexgateway . Limits
done chan struct { }
dnsProvider * discovery . DNS
pool * client . Pool
ring ring . ReadRing
limits Limits
done chan struct { }
}
// NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance.
//
// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created using a ring.
// Otherwise, it creates a GRPC connection pool to as many addresses as can be resolved from the given address.
func NewGatewayClient ( cfg IndexGateway ClientConfig, r prometheus . Registerer , limits indexgateway . Limits , logger log . Logger , metricsNamespace string ) ( * GatewayClient , error ) {
func NewGatewayClient ( cfg ClientConfig , r prometheus . Registerer , limits Limits , logger log . Logger , metricsNamespace string ) ( * GatewayClient , error ) {
latency := prometheus . NewHistogramVec ( prometheus . HistogramOpts {
Namespace : constants . Loki ,
Name : "index_gateway_request_duration_seconds" ,
@ -151,7 +143,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, lim
return nil , errors . Wrap ( err , "index gateway grpc dial option" )
}
factory := func ( addr string ) ( client . PoolClient , error ) {
igPool , err := NewIndexGatewayGRPC Pool ( addr , dialOpts )
igPool , err := NewClient Pool ( addr , dialOpts )
if err != nil {
return nil , errors . Wrap ( err , "new index gateway grpc pool" )
}
@ -165,7 +157,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, lim
sgClient . cfg . PoolConfig . ClientCleanupPeriod = 5 * time . Second
sgClient . cfg . PoolConfig . HealthCheckIngesters = true
if sgClient . cfg . Mode == indexgateway . RingMode {
if sgClient . cfg . Mode == RingMode {
sgClient . pool = clientpool . NewPool ( "index-gateway" , sgClient . cfg . PoolConfig , sgClient . ring , client . PoolAddrFunc ( factory ) , logger , metricsNamespace )
} else {
// Note we don't use clientpool.NewPool because we want to provide our own discovery function
@ -380,7 +372,7 @@ func (s *GatewayClient) getShardsFromStatsFallback(
return nil , errors . Wrap ( err , "index gateway client get tenant ID" )
}
p , err := indexgateway . ExtractShardRequestMatchersAndAST ( in . Query )
p , err := ExtractShardRequestMatchersAndAST ( in . Query )
if err != nil {
return nil , errors . Wrap ( err , "failure while falling back to stats for shard calculation" )
@ -531,9 +523,9 @@ func (s *GatewayClient) getServerAddresses(tenantID string) ([]string, error) {
// The GRPC pool we use only does discovery calls when cleaning up already existing connections,
// so the list of addresses should always be provided from the external provider (ring or DNS)
// and not from the RegisteredAddresses method as this list is only populated after a call to GetClientFor
if s . cfg . Mode == indexgateway . RingMode {
r := indexgateway . GetShuffleShardingSubring ( s . ring , tenantID , s . limits )
rs , err := r . GetReplicationSetForOperation ( indexgateway . IndexesRead )
if s . cfg . Mode == RingMode {
r := GetShuffleShardingSubring ( s . ring , tenantID , s . limits )
rs , err := r . GetReplicationSetForOperation ( IndexesRead )
if err != nil {
return nil , errors . Wrap ( err , "index gateway get ring" )
}
@ -587,7 +579,7 @@ func (b *grpcIter) Value() []byte {
return b . Rows [ b . i ] . Value
}
func instrumentation ( cfg IndexGateway ClientConfig, clientRequestDuration * prometheus . HistogramVec ) ( [ ] grpc . UnaryClientInterceptor , [ ] grpc . StreamClientInterceptor ) {
func instrumentation ( cfg ClientConfig , clientRequestDuration * prometheus . HistogramVec ) ( [ ] grpc . UnaryClientInterceptor , [ ] grpc . StreamClientInterceptor ) {
var unaryInterceptors [ ] grpc . UnaryClientInterceptor
unaryInterceptors = append ( unaryInterceptors , cfg . GRPCUnaryClientInterceptors ... )
unaryInterceptors = append ( unaryInterceptors , otgrpc . OpenTracingClientInterceptor ( opentracing . GlobalTracer ( ) ) )