Fix panic in bloom gateway client (#11537)

The bloom gateway client panicked, because the ring, from which it gets
the subring for the tenant was never passed as argument in the
constructor, even though it was part of the client struct.

---------

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11505/head
Christian Haudum 1 year ago committed by GitHub
parent 26432c0bf8
commit 55fa0d19e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      pkg/bloomgateway/client.go
  2. 17
      pkg/bloomgateway/client_test.go
  3. 2
      pkg/bloomgateway/querier.go
  4. 6
      pkg/bloomgateway/querier_test.go
  5. 5
      pkg/loki/modules.go

@ -149,8 +149,9 @@ type GatewayClient struct {
ring ring.ReadRing
}
func NewGatewayClient(
func NewClient(
cfg ClientConfig,
readRing ring.ReadRing,
limits Limits,
registerer prometheus.Registerer,
logger log.Logger,
@ -207,6 +208,7 @@ func NewGatewayClient(
logger: logger,
limits: limits,
pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger, metricsNamespace),
ring: readRing,
}, nil
}

@ -1,6 +1,7 @@
package bloomgateway
import (
"context"
"math"
"sort"
"testing"
@ -10,6 +11,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/bloomutils"
@ -21,15 +23,18 @@ func TestBloomGatewayClient(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1}, nil)
l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1, BloomGatewayEnabled: true}, nil)
require.NoError(t, err)
cfg := ClientConfig{}
flagext.DefaultValues(&cfg)
t.Run("", func(t *testing.T) {
_, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false)
t.Run("FilterChunks returns response", func(t *testing.T) {
c, err := NewClient(cfg, &mockRing{}, l, reg, logger, "loki", nil, false)
require.NoError(t, err)
res, err := c.FilterChunks(context.Background(), "tenant", model.Now(), model.Now(), nil)
require.NoError(t, err)
require.Equal(t, []*logproto.GroupedChunkRefs{}, res)
})
}
@ -212,7 +217,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
cfg := ClientConfig{}
flagext.DefaultValues(&cfg)
c, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false)
c, err := NewClient(cfg, nil, l, reg, logger, "loki", nil, false)
require.NoError(t, err)
instances := []ring.InstanceDesc{
@ -408,8 +413,8 @@ func (*mockRing) ReplicationFactor() int {
}
// ShuffleShard implements ring.ReadRing.
func (*mockRing) ShuffleShard(_ string, _ int) ring.ReadRing {
panic("unimplemented")
func (r *mockRing) ShuffleShard(_ string, _ int) ring.ReadRing {
return r
}
// ShuffleShardWithLookback implements ring.ReadRing.

@ -18,7 +18,7 @@ type BloomQuerier struct {
logger log.Logger
}
func NewBloomQuerier(c Client, logger log.Logger) *BloomQuerier {
func NewQuerier(c Client, logger log.Logger) *BloomQuerier {
return &BloomQuerier{c: c, logger: logger}
}

@ -32,7 +32,7 @@ func TestBloomQuerier(t *testing.T) {
t.Run("client not called when filters are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewBloomQuerier(c, logger)
bq := NewQuerier(c, logger)
ctx := context.Background()
through := model.Now()
@ -51,7 +51,7 @@ func TestBloomQuerier(t *testing.T) {
t.Run("client not called when chunkRefs are empty", func(t *testing.T) {
c := &noopClient{}
bq := NewBloomQuerier(c, logger)
bq := NewQuerier(c, logger)
ctx := context.Background()
through := model.Now()
@ -68,7 +68,7 @@ func TestBloomQuerier(t *testing.T) {
t.Run("querier propagates error from client", func(t *testing.T) {
c := &noopClient{err: errors.New("something went wrong")}
bq := NewBloomQuerier(c, logger)
bq := NewQuerier(c, logger)
ctx := context.Background()
through := model.Now()

@ -1337,8 +1337,9 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
var bloomQuerier indexgateway.BloomQuerier
if t.Cfg.BloomGateway.Enabled {
bloomGatewayClient, err := bloomgateway.NewGatewayClient(
bloomGatewayClient, err := bloomgateway.NewClient(
t.Cfg.BloomGateway.Client,
t.bloomGatewayRingManager.Ring,
t.Overrides,
prometheus.DefaultRegisterer,
logger,
@ -1349,7 +1350,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
if err != nil {
return nil, err
}
bloomQuerier = bloomgateway.NewBloomQuerier(bloomGatewayClient, logger)
bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, logger)
}
gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, logger, prometheus.DefaultRegisterer, t.Store, indexClients, bloomQuerier)

Loading…
Cancel
Save