@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math/rand"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -14,6 +15,7 @@ import (
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -23,7 +25,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/indexgateway"
util_log "github.com/grafana/loki/pkg/util/log "
"github.com/grafana/loki/pkg/util/discovery "
util_math "github.com/grafana/loki/pkg/util/math"
)
@ -76,7 +78,7 @@ type IndexGatewayClientConfig struct {
// Flags that are used by both, client and server, are defined in the indexgateway package.
func ( i * IndexGatewayClientConfig ) RegisterFlagsWithPrefix ( prefix string , f * flag . FlagSet ) {
i . GRPCClientConfig . RegisterFlagsWithPrefix ( prefix + ".grpc" , f )
f . StringVar ( & i . Address , prefix + ".server-address" , "" , "Hostname or IP of the Index Gateway gRPC server running in simple mode." )
f . StringVar ( & i . Address , prefix + ".server-address" , "" , "Hostname or IP of the Index Gateway gRPC server running in simple mode. Can also be prefixed with dns+, dnssrv+, or dnssrvnoa+ to resolve a DNS A record with multiple IP's, a DNS SRV record with a followup A record lookup, or a DNS SRV record without a followup A record lookup, respectively. " )
f . BoolVar ( & i . LogGatewayRequests , prefix + ".log-gateway-requests" , false , "Whether requests sent to the gateway should be logged or not." )
}
@ -85,24 +87,27 @@ func (i *IndexGatewayClientConfig) RegisterFlags(f *flag.FlagSet) {
}
type GatewayClient struct {
logger log . Logger
cfg IndexGatewayClientConfig
storeGatewayClientRequestDuration * prometheus . HistogramVec
conn * grpc . ClientConn
grpcClient logproto . IndexGatewayClient
dnsProvider * discovery . DNS
pool * ring_client . Pool
ring ring . ReadRing
limits indexgateway . Limits
done chan struct { }
}
// NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance.
//
// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created.
// Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode .
// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created using a ring .
// Otherwise, it creates a GRPC connection pool to as many addresses as can be resolved from the given address .
func NewGatewayClient ( cfg IndexGatewayClientConfig , r prometheus . Registerer , limits indexgateway . Limits , logger log . Logger ) ( * GatewayClient , error ) {
latency := prometheus . NewHistogramVec ( prometheus . HistogramOpts {
Namespace : "loki" ,
@ -122,47 +127,89 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, lim
}
sgClient := & GatewayClient {
logger : logger ,
cfg : cfg ,
storeGatewayClientRequestDuration : latency ,
ring : cfg . Ring ,
limits : limits ,
done : make ( chan struct { } ) ,
}
dialOpts , err := cfg . GRPCClientConfig . DialOption ( grpcclient . Instrument ( sgClient . storeGatewayClientRequestDuration ) )
if err != nil {
return nil , errors . Wrap ( err , "index gateway grpc dial option" )
}
factory := func ( addr string ) ( ring_client . PoolClient , error ) {
igPool , err := NewIndexGatewayGRPCPool ( addr , dialOpts )
if err != nil {
return nil , errors . Wrap ( err , "new index gateway grpc pool" )
}
return igPool , nil
}
//FIXME(ewelch) we don't expose the pool configs nor set defaults, and register flags is kind of messed up with remote config being defined somewhere else
//make a separate PR to make the pool config generic so it can be used with proper names in multiple places.
sgClient . cfg . PoolConfig . RemoteTimeout = 2 * time . Second
sgClient . cfg . PoolConfig . ClientCleanupPeriod = 5 * time . Second
sgClient . cfg . PoolConfig . HealthCheckIngesters = true
if sgClient . cfg . Mode == indexgateway . RingMode {
factory := func ( addr string ) ( ring_client . PoolClient , error ) {
igPool , err := NewIndexGatewayGRPCPool ( addr , dialOpts )
sgClient . pool = clientpool . NewPool ( sgClient . cfg . PoolConfig , sgClient . ring , factory , logger )
} else {
// Note we don't use clientpool.NewPool because we want to provide our own discovery function
poolCfg := ring_client . PoolConfig {
CheckInterval : sgClient . cfg . PoolConfig . ClientCleanupPeriod ,
HealthCheckEnabled : sgClient . cfg . PoolConfig . HealthCheckIngesters ,
HealthCheckTimeout : sgClient . cfg . PoolConfig . RemoteTimeout ,
}
clients := prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : "loki" ,
Name : "index_gateway_clients" ,
Help : "The current number of index gateway clients." ,
} )
if r != nil {
err := r . Register ( clients )
if err != nil {
return nil , errors . Wrap ( err , "new index gateway grpc pool" )
alreadyErr , ok := err . ( prometheus . AlreadyRegisteredError )
if ! ok {
return nil , err
}
clients = alreadyErr . ExistingCollector . ( prometheus . Gauge )
}
return igPool , nil
}
//TODO(ewelch) we can't use metrics in the provider because of duplicate registration errors
dnsProvider := discovery . NewDNS ( logger , sgClient . cfg . PoolConfig . ClientCleanupPeriod , sgClient . cfg . Address , nil )
sgClient . dnsProvider = dnsProvider
sgClient . pool = clientpool . NewPool ( cfg . PoolConfig , sgClient . ring , factory , logger )
} else {
sgClient . conn , err = grpc . Dial ( cfg . Address , dialOpts ... )
if err != nil {
return nil , errors . Wrap ( err , "index gateway grpc dial" )
// Make an attempt to do one DNS lookup so we can start with addresses
dnsProvider . RunOnce ( )
discovery := func ( ) ( [ ] string , error ) {
return dnsProvider . Addresses ( ) , nil
}
sgClient . pool = ring_client . NewPool ( "index gateway" , poolCfg , discovery , factory , clients , logger )
sgClient . grpcClient = logproto . NewIndexGatewayClient ( sgClient . conn )
}
// We have to start the pool service, it will handle removing stale clients in the background
err = services . StartAndAwaitRunning ( context . Background ( ) , sgClient . pool )
if err != nil {
return nil , errors . Wrap ( err , "failed to start index gateway connection pool" )
}
return sgClient , nil
}
// Stop stops the execution of this gateway client.
//
// If it is in simple mode, the single GRPC connection is closed. Otherwise, nothing happens.
func ( s * GatewayClient ) Stop ( ) {
if s . cfg . Mode == indexgateway . SimpleMode {
s . conn . Close ( )
ctx , cancel := context . WithTimeoutCause ( context . Background ( ) , 10 * time . Second , errors . New ( "service shutdown timeout expired" ) )
defer cancel ( )
err := services . StopAndAwaitTerminated ( ctx , s . pool )
if err != nil {
level . Error ( s . logger ) . Log ( "msg" , "failed to stop index gateway connection pool" , "err" , err )
}
s . dnsProvider . Stop ( )
}
func ( s * GatewayClient ) QueryPages ( ctx context . Context , queries [ ] index . Query , callback index . QueryPagesCallback ) error {
@ -184,93 +231,75 @@ func (s *GatewayClient) QueryIndex(_ context.Context, _ *logproto.QueryIndexRequ
}
func ( s * GatewayClient ) GetChunkRef ( ctx context . Context , in * logproto . GetChunkRefRequest , opts ... grpc . CallOption ) ( * logproto . GetChunkRefResponse , error ) {
if s . cfg . Mode == indexgateway . RingMode {
var (
resp * logproto . GetChunkRefResponse
err error
)
err = s . ringModeDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetChunkRef ( ctx , in , opts ... )
return err
} )
return resp , err
}
return s . grpcClient . GetChunkRef ( ctx , in , opts ... )
var (
resp * logproto . GetChunkRefResponse
err error
)
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetChunkRef ( ctx , in , opts ... )
return err
} )
return resp , err
}
func ( s * GatewayClient ) GetSeries ( ctx context . Context , in * logproto . GetSeriesRequest , opts ... grpc . CallOption ) ( * logproto . GetSeriesResponse , error ) {
if s . cfg . Mode == indexgateway . RingMode {
var (
resp * logproto . GetSeriesResponse
err error
)
err = s . ringModeDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetSeries ( ctx , in , opts ... )
return err
} )
return resp , err
}
return s . grpcClient . GetSeries ( ctx , in , opts ... )
var (
resp * logproto . GetSeriesResponse
err error
)
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetSeries ( ctx , in , opts ... )
return err
} )
return resp , err
}
func ( s * GatewayClient ) LabelNamesForMetricName ( ctx context . Context , in * logproto . LabelNamesForMetricNameRequest , opts ... grpc . CallOption ) ( * logproto . LabelResponse , error ) {
if s . cfg . Mode == indexgateway . RingMode {
var (
resp * logproto . LabelResponse
err error
)
err = s . ringModeDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . LabelNamesForMetricName ( ctx , in , opts ... )
return err
} )
return resp , err
}
return s . grpcClient . LabelNamesForMetricName ( ctx , in , opts ... )
var (
resp * logproto . LabelResponse
err error
)
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . LabelNamesForMetricName ( ctx , in , opts ... )
return err
} )
return resp , err
}
func ( s * GatewayClient ) LabelValuesForMetricName ( ctx context . Context , in * logproto . LabelValuesForMetricNameRequest , opts ... grpc . CallOption ) ( * logproto . LabelResponse , error ) {
if s . cfg . Mode == indexgateway . RingMode {
var (
resp * logproto . LabelResponse
err error
)
err = s . ringModeDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . LabelValuesForMetricName ( ctx , in , opts ... )
return err
} )
return resp , err
}
return s . grpcClient . LabelValuesForMetricName ( ctx , in , opts ... )
var (
resp * logproto . LabelResponse
err error
)
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . LabelValuesForMetricName ( ctx , in , opts ... )
return err
} )
return resp , err
}
func ( s * GatewayClient ) GetStats ( ctx context . Context , in * logproto . IndexStatsRequest , opts ... grpc . CallOption ) ( * logproto . IndexStatsResponse , error ) {
if s . cfg . Mode == indexgateway . RingMode {
var (
resp * logproto . IndexStatsResponse
err error
)
err = s . ringModeDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetStats ( ctx , in , opts ... )
return err
} )
return resp , err
}
return s . grpcClient . GetStats ( ctx , in , opts ... )
var (
resp * logproto . IndexStatsResponse
err error
)
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetStats ( ctx , in , opts ... )
return err
} )
return resp , err
}
func ( s * GatewayClient ) GetVolume ( ctx context . Context , in * logproto . VolumeRequest , opts ... grpc . CallOption ) ( * logproto . VolumeResponse , error ) {
if s . cfg . Mode == indexgateway . RingMode {
var (
resp * logproto . VolumeResponse
err error
)
err = s . ringModeDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetVolume ( ctx , in , opts ... )
return err
} )
return resp , err
}
return s . grpcClient . GetVolume ( ctx , in , opts ... )
var (
resp * logproto . VolumeResponse
err error
)
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetVolume ( ctx , in , opts ... )
return err
} )
return resp , err
}
func ( s * GatewayClient ) doQueries ( ctx context . Context , queries [ ] index . Query , callback index . QueryPagesCallback ) error {
@ -288,13 +317,10 @@ func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, ca
} )
}
if s . cfg . Mode == indexgateway . RingMode {
return s . ringModeDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
return s . clientDoQueries ( ctx , gatewayQueries , queryKeyQueryMap , callback , client )
} )
}
return s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
return s . clientDoQueries ( ctx , gatewayQueries , queryKeyQueryMap , callback , client )
} )
return s . clientDoQueries ( ctx , gatewayQueries , queryKeyQueryMap , callback , s . grpcClient )
}
// clientDoQueries send a query request to an Index Gateway instance using the given gRPC client.
@ -318,7 +344,7 @@ func (s *GatewayClient) clientDoQueries(ctx context.Context, gatewayQueries []*l
}
query , ok := queryKeyQueryMap [ resp . QueryKey ]
if ! ok {
level . Error ( util_log . L ogger) . Log ( "msg" , fmt . Sprintf ( "unexpected %s QueryKey received, expected queries %s" , resp . QueryKey , fmt . Sprint ( queryKeyQueryMap ) ) )
level . Error ( s . l ogger) . Log ( "msg" , fmt . Sprintf ( "unexpected %s QueryKey received, expected queries %s" , resp . QueryKey , fmt . Sprint ( queryKeyQueryMap ) ) )
return fmt . Errorf ( "unexpected %s QueryKey received" , resp . QueryKey )
}
if ! callback ( query , & readBatch { resp } ) {
@ -329,9 +355,9 @@ func (s *GatewayClient) clientDoQueries(ctx context.Context, gatewayQueries []*l
return nil
}
// ringMode Do executes the given function for each Index Gateway instance in the ring mapping to the correct tenant in the index.
// pool Do executes the given function for each Index Gateway instance in the ring mapping to the correct tenant in the index.
// In case of callback failure, we'll try another member of the ring for that tenant ID.
func ( s * GatewayClient ) ringMode Do( ctx context . Context , callback func ( client logproto . IndexGatewayClient ) error ) error {
func ( s * GatewayClient ) pool Do( ctx context . Context , callback func ( client logproto . IndexGatewayClient ) error ) error {
userID , err := tenant . TenantID ( ctx )
if err != nil {
return errors . Wrap ( err , "index gateway client get tenant ID" )
@ -340,22 +366,28 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log
if err != nil {
return err
}
if len ( addrs ) == 0 {
level . Error ( s . logger ) . Log ( "msg" , fmt . Sprintf ( "no index gateway instances found for tenant %s" , userID ) )
return fmt . Errorf ( "no index gateway instances found for tenant %s" , userID )
}
var lastErr error
for _ , addr := range addrs {
if s . cfg . LogGatewayRequests {
level . Debug ( util_log . Logger ) . Log ( "msg" , "sending request to gateway" , "gateway" , addr , "tenant" , userID )
level . Debug ( s . l ogger) . Log ( "msg" , "sending request to gateway" , "gateway" , addr , "tenant" , userID )
}
genericClient , err := s . pool . GetClientFor ( addr )
if err != nil {
level . Error ( util_log . L ogger) . Log ( "msg" , fmt . Sprintf ( "failed to get client for instance %s" , addr ) , "err" , err )
level . Error ( s . l ogger) . Log ( "msg" , fmt . Sprintf ( "failed to get client for instance %s" , addr ) , "err" , err )
continue
}
client := ( genericClient . ( logproto . IndexGatewayClient ) )
if err := callback ( client ) ; err != nil {
lastErr = err
level . Error ( util_log . L ogger) . Log ( "msg" , fmt . Sprintf ( "client do failed for instance %s" , addr ) , "err" , err )
level . Error ( s . l ogger) . Log ( "msg" , fmt . Sprintf ( "client do failed for instance %s" , addr ) , "err" , err )
continue
}
@ -366,13 +398,21 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log
}
func ( s * GatewayClient ) getServerAddresses ( tenantID string ) ( [ ] string , error ) {
r := indexgateway . GetShuffleShardingSubring ( s . ring , tenantID , s . limits )
rs , err := r . GetReplicationSetForOperation ( indexgateway . IndexesRead )
if err != nil {
return nil , errors . Wrap ( err , "index gateway get ring" )
var addrs [ ] string
// The GRPC pool we use only does discovery calls when cleaning up already existing connections,
// so the list of addresses should always be provided from the external provider (ring or DNS)
// and not from the RegisteredAddresses method as this list is only populated after a call to GetClientFor
if s . cfg . Mode == indexgateway . RingMode {
r := indexgateway . GetShuffleShardingSubring ( s . ring , tenantID , s . limits )
rs , err := r . GetReplicationSetForOperation ( indexgateway . IndexesRead )
if err != nil {
return nil , errors . Wrap ( err , "index gateway get ring" )
}
addrs = rs . GetAddresses ( )
} else {
addrs = s . dnsProvider . Addresses ( )
}
addrs := rs . GetAddresses ( )
// shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant.
rand . Shuffle ( len ( addrs ) , func ( i , j int ) {
addrs [ i ] , addrs [ j ] = addrs [ j ] , addrs [ i ]