diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 563e433035..bea2d18996 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -6358,6 +6358,14 @@ boltdb_shipper: # CLI flag: -boltdb.shipper.index-gateway-client.log-gateway-requests [log_gateway_requests: | default = false] + # Experimental: Defines buckets for time-based sharding. Time based sharding + # only takes affect when index gateways run in simple mode. To enable client + # side time-based sharding of queries across index gateway instances set at + # least one bucket in the format of a string representation of a + # time.Duration, e.g. ['168h', '336h', '504h'] + # CLI flag: -boltdb.shipper.index-gateway-client.time-based-sharding-buckets + [time_based_sharding_buckets: | default = []] + [ingestername: | default = ""] [mode: | default = ""] @@ -6413,6 +6421,14 @@ tsdb_shipper: # CLI flag: -tsdb.shipper.index-gateway-client.log-gateway-requests [log_gateway_requests: | default = false] + # Experimental: Defines buckets for time-based sharding. Time based sharding + # only takes affect when index gateways run in simple mode. To enable client + # side time-based sharding of queries across index gateway instances set at + # least one bucket in the format of a string representation of a + # time.Duration, e.g. ['168h', '336h', '504h'] + # CLI flag: -tsdb.shipper.index-gateway-client.time-based-sharding-buckets + [time_based_sharding_buckets: | default = []] + [ingestername: | default = ""] [mode: | default = ""] diff --git a/pkg/indexgateway/client.go b/pkg/indexgateway/client.go index 080b944023..8ba9ae37f5 100644 --- a/pkg/indexgateway/client.go +++ b/pkg/indexgateway/client.go @@ -1,17 +1,21 @@ package indexgateway import ( + "cmp" "context" "flag" "fmt" "io" "math/rand" + "slices" + "strings" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/instrument" "github.com/grafana/dskit/middleware" @@ -77,6 +81,8 @@ type ClientConfig struct { GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"` GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"` + + TimeBasedShardingBuckets []string `yaml:"time_based_sharding_buckets" category:"Experimental"` } // RegisterFlagsWithPrefix register client-specific flags with the given prefix. @@ -86,6 +92,13 @@ func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) f.StringVar(&i.Address, prefix+".server-address", "", "Hostname or IP of the Index Gateway gRPC server running in simple mode. Can also be prefixed with dns+, dnssrv+, or dnssrvnoa+ to resolve a DNS A record with multiple IP's, a DNS SRV record with a followup A record lookup, or a DNS SRV record without a followup A record lookup, respectively.") f.BoolVar(&i.LogGatewayRequests, prefix+".log-gateway-requests", false, "Whether requests sent to the gateway should be logged or not.") + + // Experimental: Time-based client side query sharding + f.Var( + (*flagext.StringSlice)(&i.TimeBasedShardingBuckets), + prefix+".time-based-sharding-buckets", + "Experimental: Defines buckets for time-based sharding. Time based sharding only takes affect when index gateways run in simple mode. To enable client side time-based sharding of queries across index gateway instances set at least one bucket in the format of a string representation of a time.Duration, e.g. ['168h', '336h', '504h']", + ) } func (i *ClientConfig) RegisterFlags(f *flag.FlagSet) { @@ -100,6 +113,7 @@ type GatewayClient struct { pool *client.Pool ring ring.ReadRing limits Limits + buckets []time.Duration done chan struct{} } @@ -125,12 +139,25 @@ func NewGatewayClient(cfg ClientConfig, r prometheus.Registerer, limits Limits, } } + buckets := make([]time.Duration, len(cfg.TimeBasedShardingBuckets)) + for i := range len(buckets) { + b, err := time.ParseDuration(cfg.TimeBasedShardingBuckets[i]) + if err != nil { + level.Warn(logger).Log("msg", "failed to parse time duration of bucket", "err", err.Error(), "value", cfg.TimeBasedShardingBuckets[i]) + continue + } + buckets[i] = b.Abs() * -1 // Buckets reference times in the past, so we need negative durations + } + // Sort descending, since we have negative duration values + slices.SortFunc(buckets, func(a, b time.Duration) int { return cmp.Compare(b, a) }) + sgClient := &GatewayClient{ logger: logger, cfg: cfg, storeGatewayClientRequestDuration: latency, ring: cfg.Ring, limits: limits, + buckets: buckets, done: make(chan struct{}), } @@ -241,6 +268,8 @@ func (s *GatewayClient) GetChunkRef(ctx context.Context, in *logproto.GetChunkRe err = s.poolDo(ctx, func(client logproto.IndexGatewayClient) error { resp, err = client.GetChunkRef(ctx, in) return err + }, func(addrs []string) []string { + return addressesForQueryEndTime(addrs, in.Through.Time(), s.buckets, time.Now().UTC()) }) return resp, err } @@ -253,6 +282,8 @@ func (s *GatewayClient) GetSeries(ctx context.Context, in *logproto.GetSeriesReq err = s.poolDo(ctx, func(client logproto.IndexGatewayClient) error { resp, err = client.GetSeries(ctx, in) return err + }, func(addrs []string) []string { + return addressesForQueryEndTime(addrs, in.Through.Time(), s.buckets, time.Now().UTC()) }) return resp, err } @@ -265,6 +296,8 @@ func (s *GatewayClient) LabelNamesForMetricName(ctx context.Context, in *logprot err = s.poolDo(ctx, func(client logproto.IndexGatewayClient) error { resp, err = client.LabelNamesForMetricName(ctx, in) return err + }, func(addrs []string) []string { + return addressesForQueryEndTime(addrs, in.Through.Time(), s.buckets, time.Now().UTC()) }) return resp, err } @@ -277,6 +310,8 @@ func (s *GatewayClient) LabelValuesForMetricName(ctx context.Context, in *logpro err = s.poolDo(ctx, func(client logproto.IndexGatewayClient) error { resp, err = client.LabelValuesForMetricName(ctx, in) return err + }, func(addrs []string) []string { + return addressesForQueryEndTime(addrs, in.Through.Time(), s.buckets, time.Now().UTC()) }) return resp, err } @@ -289,6 +324,8 @@ func (s *GatewayClient) GetStats(ctx context.Context, in *logproto.IndexStatsReq err = s.poolDo(ctx, func(client logproto.IndexGatewayClient) error { resp, err = client.GetStats(ctx, in) return err + }, func(addrs []string) []string { + return addressesForQueryEndTime(addrs, in.Through.Time(), s.buckets, time.Now().UTC()) }) return resp, err } @@ -301,6 +338,8 @@ func (s *GatewayClient) GetVolume(ctx context.Context, in *logproto.VolumeReques err = s.poolDo(ctx, func(client logproto.IndexGatewayClient) error { resp, err = client.GetVolume(ctx, in) return err + }, func(addrs []string) []string { + return addressesForQueryEndTime(addrs, in.Through.Time(), s.buckets, time.Now().UTC()) }) return resp, err } @@ -349,6 +388,9 @@ func (s *GatewayClient) GetShards( return nil }, + func(addrs []string) []string { + return addressesForQueryEndTime(addrs, in.Through.Time(), s.buckets, time.Now().UTC()) + }, func(_ error) bool { errCt++ return errCt <= maxErrs @@ -390,7 +432,7 @@ func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, ca return s.poolDo(ctx, func(client logproto.IndexGatewayClient) error { return s.clientDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback, client) - }) + }, noFilter) } @@ -428,13 +470,14 @@ func (s *GatewayClient) clientDoQueries(ctx context.Context, gatewayQueries []*l // poolDo executes the given function for each Index Gateway instance in the ring mapping to the correct tenant in the index. // In case of callback failure, we'll try another member of the ring for that tenant ID. -func (s *GatewayClient) poolDo(ctx context.Context, callback func(client logproto.IndexGatewayClient) error) error { - return s.poolDoWithStrategy(ctx, callback, func(error) bool { return true }) +func (s *GatewayClient) poolDo(ctx context.Context, callback func(client logproto.IndexGatewayClient) error, filterServerList func([]string) []string) error { + return s.poolDoWithStrategy(ctx, callback, filterServerList, func(error) bool { return true }) } func (s *GatewayClient) poolDoWithStrategy( ctx context.Context, callback func(client logproto.IndexGatewayClient) error, + filterServerList func([]string) []string, shouldRetry func(error) bool, ) error { userID, err := tenant.TenantID(ctx) @@ -451,6 +494,18 @@ func (s *GatewayClient) poolDoWithStrategy( return fmt.Errorf("no index gateway instances found for tenant %s", userID) } + if s.cfg.Mode == SimpleMode { + slices.Sort(addrs) + allAddr := strings.Join(addrs, ",") + addrs = filterServerList(addrs) + level.Debug(s.logger).Log("msg", "filtered list of index gateway instances", "all", allAddr, "filtered", strings.Join(addrs, ",")) + } + + // shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant. + rand.Shuffle(len(addrs), func(i, j int) { + addrs[i], addrs[j] = addrs[j], addrs[i] + }) + var lastErr error for _, addr := range addrs { if s.cfg.LogGatewayRequests { @@ -496,11 +551,6 @@ func (s *GatewayClient) getServerAddresses(tenantID string) ([]string, error) { addrs = s.dnsProvider.Addresses() } - // shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant. - rand.Shuffle(len(addrs), func(i, j int) { - addrs[i], addrs[j] = addrs[j], addrs[i] - }) - return addrs, nil } @@ -554,3 +604,43 @@ func instrumentation(cfg ClientConfig, clientRequestDuration *prometheus.Histogr return unaryInterceptors, streamInterceptors } + +func addressesForQueryEndTime(addrs []string, t time.Time, buckets []time.Duration, now time.Time) []string { + n := len(addrs) + m := len(buckets) + + // If there are no buckets, return all addresses + if m < 1 { + return addrs + } + + // The bucketing only really makes sense if there are equal or more than 2^len(buckets) index gateways. + // Example with 3 buckets and 8 instances: + // Bucket 0: now -> now - 7d => addrs[0:4] + // Bucket 1: now - 7d -> now - 14d => addrs[4:6] + // Bucket 2: now - 14d -> now - 21d => addrs[6:7] + // Remainder: now - 21d -> now - Inf => addrs[7:8] + if n < (1 << m) { + return addrs + } + + today := now.Truncate(24 * time.Hour) + start, end := 0, n>>1 + + for i := range m { + if t.After(today.Add(buckets[i])) { + break + } + + start = end + end = end + (n >> (i + 2)) // n / 2^(i+2) + + if i == m-1 { + end = n + } + } + + return addrs[start:end] +} + +func noFilter(addrs []string) []string { return addrs } diff --git a/pkg/indexgateway/client_test.go b/pkg/indexgateway/client_test.go index 91005a591e..6f6886a9d2 100644 --- a/pkg/indexgateway/client_test.go +++ b/pkg/indexgateway/client_test.go @@ -437,3 +437,173 @@ func TestDoubleRegistration(t *testing.T) { require.NoError(t, err) defer client.Stop() } + +func Test_addressesForQueryEndTime(t *testing.T) { + // Use the current time as reference and create relative times + now := time.Date(2025, time.September, 11, 0, 0, 0, 0, time.UTC) + + t.Run("empty bucket list", func(t *testing.T) { + addrs := []string{"127.0.0.1", "127.0.0.2"} + buckets := []time.Duration{} + + tests := []struct { + name string + t time.Time + want []string + }{ + { + name: "any timestamp", + t: now.Add(-300 * time.Hour), + want: []string{"127.0.0.1", "127.0.0.2"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := addressesForQueryEndTime(addrs, tt.t, buckets, now) + require.Equal(t, tt.want, got) + }) + } + }) + + t.Run("empty address list", func(t *testing.T) { + addrs := []string{} + buckets := []time.Duration{-168 * time.Hour, -336 * time.Hour, -504 * time.Hour} + + tests := []struct { + name string + t time.Time + want []string + }{ + { + name: "first bucket", + t: now.Add(-1 * time.Hour), + want: []string{}, + }, + { + name: "third bucket", + t: now.Add(-400 * time.Hour), + want: []string{}, + }, + { + name: "inf bucket", + t: now.Add(-600 * time.Hour), + want: []string{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := addressesForQueryEndTime(addrs, tt.t, buckets, now) + require.Equal(t, tt.want, got) + }) + } + }) + + t.Run("address list smaller than pow(2, len(buckets))", func(t *testing.T) { + addrs := []string{"127.0.0.1", "127.0.0.2"} + buckets := []time.Duration{-168 * time.Hour, -336 * time.Hour, -504 * time.Hour} + + tests := []struct { + name string + t time.Time + want []string + }{ + { + name: "first bucket", + t: now.Add(-1 * time.Hour), + want: []string{"127.0.0.1", "127.0.0.2"}, + }, + { + name: "third bucket", + t: now.Add(-400 * time.Hour), + want: []string{"127.0.0.1", "127.0.0.2"}, + }, + { + name: "inf bucket", + t: now.Add(-600 * time.Hour), + want: []string{"127.0.0.1", "127.0.0.2"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := addressesForQueryEndTime(addrs, tt.t, buckets, now) + require.Equal(t, tt.want, got) + }) + } + }) + + t.Run("address list equal to pow(2, len(buckets))", func(t *testing.T) { + addrs := []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.0.8"} + buckets := []time.Duration{-168 * time.Hour, -336 * time.Hour, -504 * time.Hour} + + tests := []struct { + name string + t time.Time + want []string + }{ + { + name: "first bucket", + t: now.Add(-1 * time.Hour), + want: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"}, + }, + { + name: "second bucket", + t: now.Add(-335 * time.Hour), + want: []string{"127.0.0.5", "127.0.0.6"}, + }, + { + name: "third bucket", + t: now.Add(-400 * time.Hour), + want: []string{"127.0.0.7"}, + }, + { + name: "inf bucket", + t: now.Add(-600 * time.Hour), + want: []string{"127.0.0.8"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := addressesForQueryEndTime(addrs, tt.t, buckets, now) + require.Equal(t, tt.want, got) + }) + } + }) + + t.Run("address list greather than pow(2, len(buckets))", func(t *testing.T) { + addrs := []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.0.10", "127.0.0.11"} + buckets := []time.Duration{-168 * time.Hour, -336 * time.Hour, -504 * time.Hour} + + tests := []struct { + name string + t time.Time + want []string + }{ + { + name: "first bucket", + t: now.Add(-1 * time.Hour), + want: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"}, + }, + { + name: "second bucket", + t: now.Add(-335 * time.Hour), + want: []string{"127.0.0.6", "127.0.0.7"}, + }, + { + name: "third bucket", + t: now.Add(-400 * time.Hour), + want: []string{"127.0.0.8"}, + }, + { + name: "inf bucket", + t: now.Add(-600 * time.Hour), + want: []string{"127.0.0.9", "127.0.0.10", "127.0.0.11"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := addressesForQueryEndTime(addrs, tt.t, buckets, now) + require.Equal(t, tt.want, got) + }) + } + }) +}