@ -1,17 +1,21 @@
package indexgateway
import (
"cmp"
"context"
"flag"
"fmt"
"io"
"math/rand"
"slices"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/middleware"
@ -77,6 +81,8 @@ type ClientConfig struct {
GRPCUnaryClientInterceptors [ ] grpc . UnaryClientInterceptor ` yaml:"-" `
GRCPStreamClientInterceptors [ ] grpc . StreamClientInterceptor ` yaml:"-" `
TimeBasedShardingBuckets [ ] string ` yaml:"time_based_sharding_buckets" category:"Experimental" `
}
// RegisterFlagsWithPrefix register client-specific flags with the given prefix.
@ -86,6 +92,13 @@ func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
i . GRPCClientConfig . RegisterFlagsWithPrefix ( prefix + ".grpc" , f )
f . StringVar ( & i . Address , prefix + ".server-address" , "" , "Hostname or IP of the Index Gateway gRPC server running in simple mode. Can also be prefixed with dns+, dnssrv+, or dnssrvnoa+ to resolve a DNS A record with multiple IP's, a DNS SRV record with a followup A record lookup, or a DNS SRV record without a followup A record lookup, respectively." )
f . BoolVar ( & i . LogGatewayRequests , prefix + ".log-gateway-requests" , false , "Whether requests sent to the gateway should be logged or not." )
// Experimental: Time-based client side query sharding
f . Var (
( * flagext . StringSlice ) ( & i . TimeBasedShardingBuckets ) ,
prefix + ".time-based-sharding-buckets" ,
"Experimental: Defines buckets for time-based sharding. Time based sharding only takes affect when index gateways run in simple mode. To enable client side time-based sharding of queries across index gateway instances set at least one bucket in the format of a string representation of a time.Duration, e.g. ['168h', '336h', '504h']" ,
)
}
func ( i * ClientConfig ) RegisterFlags ( f * flag . FlagSet ) {
@ -100,6 +113,7 @@ type GatewayClient struct {
pool * client . Pool
ring ring . ReadRing
limits Limits
buckets [ ] time . Duration
done chan struct { }
}
@ -125,12 +139,25 @@ func NewGatewayClient(cfg ClientConfig, r prometheus.Registerer, limits Limits,
}
}
buckets := make ( [ ] time . Duration , len ( cfg . TimeBasedShardingBuckets ) )
for i := range len ( buckets ) {
b , err := time . ParseDuration ( cfg . TimeBasedShardingBuckets [ i ] )
if err != nil {
level . Warn ( logger ) . Log ( "msg" , "failed to parse time duration of bucket" , "err" , err . Error ( ) , "value" , cfg . TimeBasedShardingBuckets [ i ] )
continue
}
buckets [ i ] = b . Abs ( ) * - 1 // Buckets reference times in the past, so we need negative durations
}
// Sort descending, since we have negative duration values
slices . SortFunc ( buckets , func ( a , b time . Duration ) int { return cmp . Compare ( b , a ) } )
sgClient := & GatewayClient {
logger : logger ,
cfg : cfg ,
storeGatewayClientRequestDuration : latency ,
ring : cfg . Ring ,
limits : limits ,
buckets : buckets ,
done : make ( chan struct { } ) ,
}
@ -241,6 +268,8 @@ func (s *GatewayClient) GetChunkRef(ctx context.Context, in *logproto.GetChunkRe
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetChunkRef ( ctx , in )
return err
} , func ( addrs [ ] string ) [ ] string {
return addressesForQueryEndTime ( addrs , in . Through . Time ( ) , s . buckets , time . Now ( ) . UTC ( ) )
} )
return resp , err
}
@ -253,6 +282,8 @@ func (s *GatewayClient) GetSeries(ctx context.Context, in *logproto.GetSeriesReq
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetSeries ( ctx , in )
return err
} , func ( addrs [ ] string ) [ ] string {
return addressesForQueryEndTime ( addrs , in . Through . Time ( ) , s . buckets , time . Now ( ) . UTC ( ) )
} )
return resp , err
}
@ -265,6 +296,8 @@ func (s *GatewayClient) LabelNamesForMetricName(ctx context.Context, in *logprot
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . LabelNamesForMetricName ( ctx , in )
return err
} , func ( addrs [ ] string ) [ ] string {
return addressesForQueryEndTime ( addrs , in . Through . Time ( ) , s . buckets , time . Now ( ) . UTC ( ) )
} )
return resp , err
}
@ -277,6 +310,8 @@ func (s *GatewayClient) LabelValuesForMetricName(ctx context.Context, in *logpro
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . LabelValuesForMetricName ( ctx , in )
return err
} , func ( addrs [ ] string ) [ ] string {
return addressesForQueryEndTime ( addrs , in . Through . Time ( ) , s . buckets , time . Now ( ) . UTC ( ) )
} )
return resp , err
}
@ -289,6 +324,8 @@ func (s *GatewayClient) GetStats(ctx context.Context, in *logproto.IndexStatsReq
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetStats ( ctx , in )
return err
} , func ( addrs [ ] string ) [ ] string {
return addressesForQueryEndTime ( addrs , in . Through . Time ( ) , s . buckets , time . Now ( ) . UTC ( ) )
} )
return resp , err
}
@ -301,6 +338,8 @@ func (s *GatewayClient) GetVolume(ctx context.Context, in *logproto.VolumeReques
err = s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
resp , err = client . GetVolume ( ctx , in )
return err
} , func ( addrs [ ] string ) [ ] string {
return addressesForQueryEndTime ( addrs , in . Through . Time ( ) , s . buckets , time . Now ( ) . UTC ( ) )
} )
return resp , err
}
@ -349,6 +388,9 @@ func (s *GatewayClient) GetShards(
return nil
} ,
func ( addrs [ ] string ) [ ] string {
return addressesForQueryEndTime ( addrs , in . Through . Time ( ) , s . buckets , time . Now ( ) . UTC ( ) )
} ,
func ( _ error ) bool {
errCt ++
return errCt <= maxErrs
@ -390,7 +432,7 @@ func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, ca
return s . poolDo ( ctx , func ( client logproto . IndexGatewayClient ) error {
return s . clientDoQueries ( ctx , gatewayQueries , queryKeyQueryMap , callback , client )
} )
} , noFilter )
}
@ -428,13 +470,14 @@ func (s *GatewayClient) clientDoQueries(ctx context.Context, gatewayQueries []*l
// poolDo executes the given function for each Index Gateway instance in the ring mapping to the correct tenant in the index.
// In case of callback failure, we'll try another member of the ring for that tenant ID.
func ( s * GatewayClient ) poolDo ( ctx context . Context , callback func ( client logproto . IndexGatewayClient ) error ) error {
return s . poolDoWithStrategy ( ctx , callback , func ( error ) bool { return true } )
func ( s * GatewayClient ) poolDo ( ctx context . Context , callback func ( client logproto . IndexGatewayClient ) error , filterServerList func ( [ ] string ) [ ] string ) error {
return s . poolDoWithStrategy ( ctx , callback , filterServerList , func ( error ) bool { return true } )
}
func ( s * GatewayClient ) poolDoWithStrategy (
ctx context . Context ,
callback func ( client logproto . IndexGatewayClient ) error ,
filterServerList func ( [ ] string ) [ ] string ,
shouldRetry func ( error ) bool ,
) error {
userID , err := tenant . TenantID ( ctx )
@ -451,6 +494,18 @@ func (s *GatewayClient) poolDoWithStrategy(
return fmt . Errorf ( "no index gateway instances found for tenant %s" , userID )
}
if s . cfg . Mode == SimpleMode {
slices . Sort ( addrs )
allAddr := strings . Join ( addrs , "," )
addrs = filterServerList ( addrs )
level . Debug ( s . logger ) . Log ( "msg" , "filtered list of index gateway instances" , "all" , allAddr , "filtered" , strings . Join ( addrs , "," ) )
}
// shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant.
rand . Shuffle ( len ( addrs ) , func ( i , j int ) {
addrs [ i ] , addrs [ j ] = addrs [ j ] , addrs [ i ]
} )
var lastErr error
for _ , addr := range addrs {
if s . cfg . LogGatewayRequests {
@ -496,11 +551,6 @@ func (s *GatewayClient) getServerAddresses(tenantID string) ([]string, error) {
addrs = s . dnsProvider . Addresses ( )
}
// shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant.
rand . Shuffle ( len ( addrs ) , func ( i , j int ) {
addrs [ i ] , addrs [ j ] = addrs [ j ] , addrs [ i ]
} )
return addrs , nil
}
@ -554,3 +604,43 @@ func instrumentation(cfg ClientConfig, clientRequestDuration *prometheus.Histogr
return unaryInterceptors , streamInterceptors
}
func addressesForQueryEndTime ( addrs [ ] string , t time . Time , buckets [ ] time . Duration , now time . Time ) [ ] string {
n := len ( addrs )
m := len ( buckets )
// If there are no buckets, return all addresses
if m < 1 {
return addrs
}
// The bucketing only really makes sense if there are equal or more than 2^len(buckets) index gateways.
// Example with 3 buckets and 8 instances:
// Bucket 0: now -> now - 7d => addrs[0:4]
// Bucket 1: now - 7d -> now - 14d => addrs[4:6]
// Bucket 2: now - 14d -> now - 21d => addrs[6:7]
// Remainder: now - 21d -> now - Inf => addrs[7:8]
if n < ( 1 << m ) {
return addrs
}
today := now . Truncate ( 24 * time . Hour )
start , end := 0 , n >> 1
for i := range m {
if t . After ( today . Add ( buckets [ i ] ) ) {
break
}
start = end
end = end + ( n >> ( i + 2 ) ) // n / 2^(i+2)
if i == m - 1 {
end = n
}
}
return addrs [ start : end ]
}
func noFilter ( addrs [ ] string ) [ ] string { return addrs }