diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 3aa5a7356f..6453987b91 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -149,8 +149,9 @@ type GatewayClient struct { ring ring.ReadRing } -func NewGatewayClient( +func NewClient( cfg ClientConfig, + readRing ring.ReadRing, limits Limits, registerer prometheus.Registerer, logger log.Logger, @@ -207,6 +208,7 @@ func NewGatewayClient( logger: logger, limits: limits, pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger, metricsNamespace), + ring: readRing, }, nil } diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index 6edd8fcb40..0811997d79 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -1,6 +1,7 @@ package bloomgateway import ( + "context" "math" "sort" "testing" @@ -10,6 +11,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/ring" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/bloomutils" @@ -21,15 +23,18 @@ func TestBloomGatewayClient(t *testing.T) { logger := log.NewNopLogger() reg := prometheus.NewRegistry() - l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1}, nil) + l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1, BloomGatewayEnabled: true}, nil) require.NoError(t, err) cfg := ClientConfig{} flagext.DefaultValues(&cfg) - t.Run("", func(t *testing.T) { - _, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false) + t.Run("FilterChunks returns response", func(t *testing.T) { + c, err := NewClient(cfg, &mockRing{}, l, reg, logger, "loki", nil, false) + require.NoError(t, err) + res, err := c.FilterChunks(context.Background(), "tenant", model.Now(), model.Now(), nil) require.NoError(t, err) + require.Equal(t, []*logproto.GroupedChunkRefs{}, res) }) } @@ -212,7 +217,7 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { cfg := ClientConfig{} flagext.DefaultValues(&cfg) - c, err := NewGatewayClient(cfg, l, reg, logger, "loki", nil, false) + c, err := NewClient(cfg, nil, l, reg, logger, "loki", nil, false) require.NoError(t, err) instances := []ring.InstanceDesc{ @@ -408,8 +413,8 @@ func (*mockRing) ReplicationFactor() int { } // ShuffleShard implements ring.ReadRing. -func (*mockRing) ShuffleShard(_ string, _ int) ring.ReadRing { - panic("unimplemented") +func (r *mockRing) ShuffleShard(_ string, _ int) ring.ReadRing { + return r } // ShuffleShardWithLookback implements ring.ReadRing. diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index d55176ed03..4b2366e83f 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -18,7 +18,7 @@ type BloomQuerier struct { logger log.Logger } -func NewBloomQuerier(c Client, logger log.Logger) *BloomQuerier { +func NewQuerier(c Client, logger log.Logger) *BloomQuerier { return &BloomQuerier{c: c, logger: logger} } diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index 66390d7ec4..1e7cfc30a5 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -32,7 +32,7 @@ func TestBloomQuerier(t *testing.T) { t.Run("client not called when filters are empty", func(t *testing.T) { c := &noopClient{} - bq := NewBloomQuerier(c, logger) + bq := NewQuerier(c, logger) ctx := context.Background() through := model.Now() @@ -51,7 +51,7 @@ func TestBloomQuerier(t *testing.T) { t.Run("client not called when chunkRefs are empty", func(t *testing.T) { c := &noopClient{} - bq := NewBloomQuerier(c, logger) + bq := NewQuerier(c, logger) ctx := context.Background() through := model.Now() @@ -68,7 +68,7 @@ func TestBloomQuerier(t *testing.T) { t.Run("querier propagates error from client", func(t *testing.T) { c := &noopClient{err: errors.New("something went wrong")} - bq := NewBloomQuerier(c, logger) + bq := NewQuerier(c, logger) ctx := context.Background() through := model.Now() diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5cf85e2f3e..68384086ab 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1337,8 +1337,9 @@ func (t *Loki) initIndexGateway() (services.Service, error) { var bloomQuerier indexgateway.BloomQuerier if t.Cfg.BloomGateway.Enabled { - bloomGatewayClient, err := bloomgateway.NewGatewayClient( + bloomGatewayClient, err := bloomgateway.NewClient( t.Cfg.BloomGateway.Client, + t.bloomGatewayRingManager.Ring, t.Overrides, prometheus.DefaultRegisterer, logger, @@ -1349,7 +1350,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) { if err != nil { return nil, err } - bloomQuerier = bloomgateway.NewBloomQuerier(bloomGatewayClient, logger) + bloomQuerier = bloomgateway.NewQuerier(bloomGatewayClient, logger) } gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, logger, prometheus.DefaultRegisterer, t.Store, indexClients, bloomQuerier)