feat: return ReasonFailed from ingest-limits frontend (#18123)

pull/18120/head^2
George Robinson 12 months ago committed by GitHub
parent b15d85da9b
commit c42ccc3248
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 54
      pkg/limits/frontend/frontend.go
  2. 49
      pkg/limits/frontend/frontend_test.go
  3. 25
      pkg/limits/frontend/http_test.go
  4. 9
      pkg/limits/frontend/mock_test.go
  5. 84
      pkg/limits/frontend/ring.go
  6. 7
      pkg/limits/frontend/ring_test.go

@ -14,7 +14,9 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/limits"
limits_client "github.com/grafana/loki/v3/pkg/limits/client"
"github.com/grafana/loki/v3/pkg/limits/proto"
)
@ -32,6 +34,11 @@ type Frontend struct {
subservicesWatcher *services.FailureWatcher
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher
// Metrics.
streams prometheus.Counter
streamsFailed prometheus.Counter
streamsRejected prometheus.Counter
}
// New returns a new Frontend.
@ -62,6 +69,24 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge
logger: logger,
gatherer: gatherer,
assignedPartitionsCache: assignedPartitionsCache,
streams: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_total",
Help: "The total number of received streams.",
},
),
streamsFailed: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_failed_total",
Help: "The total number of received streams that could not be checked.",
},
),
streamsRejected: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_rejected_total",
Help: "The total number of rejected streams.",
},
),
}
lifecycler, err := ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg)
@ -89,13 +114,32 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, logger log.Logge
// ExceedsLimits implements proto.IngestLimitsFrontendClient.
func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
f.streams.Add(float64(len(req.Streams)))
results := make([]*proto.ExceedsLimitsResult, 0, len(req.Streams))
resps, err := f.gatherer.ExceedsLimits(ctx, req)
if err != nil {
return nil, err
}
results := make([]*proto.ExceedsLimitsResult, 0, len(req.Streams))
for _, resp := range resps {
results = append(results, resp.Results...)
// If the entire call failed, then all streams failed.
for _, stream := range req.Streams {
results = append(results, &proto.ExceedsLimitsResult{
StreamHash: stream.StreamHash,
Reason: uint32(limits.ReasonFailed),
})
}
f.streamsFailed.Add(float64(len(req.Streams)))
level.Error(f.logger).Log("msg", "failed to check request against limits", "err", err)
} else {
for _, resp := range resps {
for _, res := range resp.Results {
// Even if the call succeeded, some (or all) streams might
// still have failed.
if res.Reason == uint32(limits.ReasonFailed) {
f.streamsFailed.Inc()
} else {
f.streamsRejected.Inc()
}
}
results = append(results, resp.Results...)
}
}
return &proto.ExceedsLimitsResponse{Results: results}, nil
}

@ -2,9 +2,14 @@ package frontend
import (
"context"
"errors"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/limits"
@ -16,6 +21,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
name string
exceedsLimitsRequest *proto.ExceedsLimitsRequest
exceedsLimitsResponses []*proto.ExceedsLimitsResponse
err error
expected *proto.ExceedsLimitsResponse
}{{
name: "no streams",
@ -124,16 +130,49 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
Reason: uint32(limits.ReasonMaxStreams),
}},
},
}, {
name: "unexpected error, response with failed reason",
exceedsLimitsRequest: &proto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 0x1,
TotalSize: 0x5,
}, {
StreamHash: 0x2,
TotalSize: 0x9,
}},
},
err: errors.New("an unexpected error occurred"),
expected: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonFailed),
}, {
StreamHash: 0x2,
Reason: uint32(limits.ReasonFailed),
}},
},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
f := Frontend{
gatherer: &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.exceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
readRing, _ := newMockRingWithClientPool(t, "test", nil, nil)
f, err := New(Config{
LifecyclerConfig: ring.LifecyclerConfig{
RingConfig: ring.Config{
KVStore: kv.Config{
Store: "inmemory",
},
},
},
}, "test", readRing, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
// Replace with our mock.
f.gatherer = &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.exceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
err: test.err,
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

@ -8,6 +8,10 @@ import (
"net/http/httptest"
"testing"
"github.com/go-kit/log"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/limits"
@ -71,14 +75,23 @@ func TestFrontend_ServeHTTP(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
f := Frontend{
gatherer: &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.expectedExceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
readRing, _ := newMockRingWithClientPool(t, "test", nil, nil)
f, err := New(Config{
LifecyclerConfig: ring.LifecyclerConfig{
RingConfig: ring.Config{
KVStore: kv.Config{
Store: "inmemory",
},
},
},
}, "test", readRing, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
f.gatherer = &mockExceedsLimitsGatherer{
t: t,
expectedExceedsLimitsRequest: test.expectedExceedsLimitsRequest,
exceedsLimitsResponses: test.exceedsLimitsResponses,
}
ts := httptest.NewServer(&f)
ts := httptest.NewServer(f)
defer ts.Close()
b, err := json.Marshal(test.request)

@ -24,13 +24,14 @@ type mockExceedsLimitsGatherer struct {
expectedExceedsLimitsRequest *proto.ExceedsLimitsRequest
exceedsLimitsResponses []*proto.ExceedsLimitsResponse
err error
}
func (g *mockExceedsLimitsGatherer) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
if expected := g.expectedExceedsLimitsRequest; expected != nil {
require.Equal(g.t, expected, req)
func (m *mockExceedsLimitsGatherer) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
if expected := m.expectedExceedsLimitsRequest; expected != nil {
require.Equal(m.t, expected, req)
}
return g.exceedsLimitsResponses, nil
return m.exceedsLimitsResponses, m.err
}
// mockIngestLimitsClient mocks proto.IngestLimitsClient.

@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/limits"
"github.com/grafana/loki/v3/pkg/limits/proto"
)
@ -41,9 +42,6 @@ type ringGatherer struct {
zoneCmp func(a, b string) int
// Metrics.
streams prometheus.Counter
streamsFailed prometheus.Counter
streamsRejected prometheus.Counter
partitionsMissing *prometheus.CounterVec
}
@ -63,24 +61,6 @@ func newRingGatherer(
numPartitions: numPartitions,
assignedPartitionsCache: assignedPartitionsCache,
zoneCmp: defaultZoneCmp,
streams: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_total",
Help: "The total number of received streams.",
},
),
streamsFailed: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_failed_total",
Help: "The total number of received streams that could not be checked.",
},
),
streamsRejected: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_streams_rejected_total",
Help: "The total number of rejected streams.",
},
),
partitionsMissing: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "loki_ingest_limits_frontend_partitions_missing_total",
@ -92,16 +72,16 @@ func newRingGatherer(
}
// ExceedsLimits implements the [exceedsLimitsGatherer] interface.
func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
func (r *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
if len(req.Streams) == 0 {
return nil, nil
}
rs, err := g.ring.GetAllHealthy(LimitsRead)
rs, err := r.ring.GetAllHealthy(LimitsRead)
if err != nil {
return nil, err
}
// Get the partition consumers for each zone.
zonesPartitions, err := g.getZoneAwarePartitionConsumers(ctx, rs.Instances)
zonesPartitions, err := r.getZoneAwarePartitionConsumers(ctx, rs.Instances)
if err != nil {
return nil, err
}
@ -113,12 +93,11 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi
for zone := range zonesPartitions {
zonesToQuery = append(zonesToQuery, zone)
}
slices.SortFunc(zonesToQuery, g.zoneCmp)
slices.SortFunc(zonesToQuery, r.zoneCmp)
// Make a copy of the streams from the request. We will prune this slice
// each time we receive the responses from a zone.
streams := make([]*proto.StreamMetadata, 0, len(req.Streams))
streams = append(streams, req.Streams...)
g.streams.Add(float64(len(streams)))
// Query each zone as ordered in zonesToQuery. If a zone answers all
// streams, the request is satisfied and there is no need to query
// subsequent zones. If a zone answers just a subset of streams
@ -129,7 +108,11 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi
// zones.
responses := make([]*proto.ExceedsLimitsResponse, 0)
for _, zone := range zonesToQuery {
resps, answered, err := g.doExceedsLimitsRPCs(ctx, req.Tenant, streams, zonesPartitions[zone], zone)
// All streams been checked against per-tenant limits.
if len(streams) == 0 {
break
}
resps, answered, err := r.doExceedsLimitsRPCs(ctx, req.Tenant, streams, zonesPartitions[zone], zone)
if err != nil {
continue
}
@ -145,26 +128,29 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi
})
return i < len(answered) && answered[i] == stream.StreamHash
})
// All streams been checked against per-tenant limits.
if len(streams) == 0 {
break
}
}
for _, resp := range responses {
g.streamsRejected.Add(float64(len(resp.Results)))
// Any unanswered streams after exhausting all zones must be failed.
if len(streams) > 0 {
failed := make([]*proto.ExceedsLimitsResult, 0, len(streams))
for _, stream := range streams {
failed = append(failed, &proto.ExceedsLimitsResult{
StreamHash: stream.StreamHash,
Reason: uint32(limits.ReasonFailed),
})
}
responses = append(responses, &proto.ExceedsLimitsResponse{Results: failed})
}
g.streamsFailed.Add(float64(len(streams)))
return responses, nil
}
func (g *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, streams []*proto.StreamMetadata, partitions map[int32]string, zone string) ([]*proto.ExceedsLimitsResponse, []uint64, error) {
func (r *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, streams []*proto.StreamMetadata, partitions map[int32]string, zone string) ([]*proto.ExceedsLimitsResponse, []uint64, error) {
// For each stream, figure out which instance consume its partition.
instancesForStreams := make(map[string][]*proto.StreamMetadata)
for _, stream := range streams {
partition := int32(stream.StreamHash % uint64(g.numPartitions))
partition := int32(stream.StreamHash % uint64(r.numPartitions))
addr, ok := partitions[partition]
if !ok {
g.partitionsMissing.WithLabelValues(zone).Inc()
r.partitionsMissing.WithLabelValues(zone).Inc()
continue
}
instancesForStreams[addr] = append(instancesForStreams[addr], stream)
@ -174,9 +160,9 @@ func (g *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, s
answeredCh := make(chan uint64, len(streams))
for addr, streams := range instancesForStreams {
errg.Go(func() error {
client, err := g.pool.GetClientFor(addr)
client, err := r.pool.GetClientFor(addr)
if err != nil {
level.Error(g.logger).Log("msg", "failed to get client for instance", "instance", addr, "err", err.Error())
level.Error(r.logger).Log("msg", "failed to get client for instance", "instance", addr, "err", err.Error())
return nil
}
resp, err := client.(proto.IngestLimitsClient).ExceedsLimits(ctx, &proto.ExceedsLimitsRequest{
@ -184,7 +170,7 @@ func (g *ringGatherer) doExceedsLimitsRPCs(ctx context.Context, tenant string, s
Streams: streams,
})
if err != nil {
level.Error(g.logger).Log("failed check execeed limits for instance", "instance", addr, "err", err.Error())
level.Error(r.logger).Log("failed check execeed limits for instance", "instance", addr, "err", err.Error())
return nil
}
responseCh <- resp
@ -218,7 +204,7 @@ type zonePartitionConsumersResult struct {
// zone will still be returned but its partition consumers will be nil.
// If ZoneAwarenessEnabled is false, it returns all partition consumers under
// a pseudo-zone ("").
func (g *ringGatherer) getZoneAwarePartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[string]map[int32]string, error) {
func (r *ringGatherer) getZoneAwarePartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[string]map[int32]string, error) {
zoneDescs := make(map[string][]ring.InstanceDesc)
for _, instance := range instances {
zoneDescs[instance.Zone] = append(zoneDescs[instance.Zone], instance)
@ -228,9 +214,9 @@ func (g *ringGatherer) getZoneAwarePartitionConsumers(ctx context.Context, insta
errg, ctx := errgroup.WithContext(ctx)
for zone, instances := range zoneDescs {
errg.Go(func() error {
res, err := g.getPartitionConsumers(ctx, instances)
res, err := r.getPartitionConsumers(ctx, instances)
if err != nil {
level.Error(g.logger).Log("msg", "failed to get partition consumers for zone", "zone", zone, "err", err.Error())
level.Error(r.logger).Log("msg", "failed to get partition consumers for zone", "zone", zone, "err", err.Error())
}
// Even if the consumers could not be fetched for a zone, we
// should still return the zone.
@ -271,7 +257,7 @@ type getAssignedPartitionsResponse struct {
// should be called once for each zone, and instances should be filtered to
// the respective zone. Alternatively, you can pass all instances for all zones
// to find the most up to date consumer for each partition across all zones.
func (g *ringGatherer) getPartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[int32]string, error) {
func (r *ringGatherer) getPartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[int32]string, error) {
errg, ctx := errgroup.WithContext(ctx)
responseCh := make(chan getAssignedPartitionsResponse, len(instances))
for _, instance := range instances {
@ -279,24 +265,24 @@ func (g *ringGatherer) getPartitionConsumers(ctx context.Context, instances []ri
// We use a cache to eliminate redundant gRPC requests for
// GetAssignedPartitions as the set of assigned partitions is
// expected to be stable outside consumer rebalances.
if resp, ok := g.assignedPartitionsCache.Get(instance.Addr); ok {
if resp, ok := r.assignedPartitionsCache.Get(instance.Addr); ok {
responseCh <- getAssignedPartitionsResponse{
addr: instance.Addr,
response: resp,
}
return nil
}
client, err := g.pool.GetClientFor(instance.Addr)
client, err := r.pool.GetClientFor(instance.Addr)
if err != nil {
level.Error(g.logger).Log("failed to get client for instance", "instance", instance.Addr, "err", err.Error())
level.Error(r.logger).Log("failed to get client for instance", "instance", instance.Addr, "err", err.Error())
return nil
}
resp, err := client.(proto.IngestLimitsClient).GetAssignedPartitions(ctx, &proto.GetAssignedPartitionsRequest{})
if err != nil {
level.Error(g.logger).Log("failed to get assigned partitions for instance", "instance", instance.Addr, "err", err.Error())
level.Error(r.logger).Log("failed to get assigned partitions for instance", "instance", instance.Addr, "err", err.Error())
return nil
}
g.assignedPartitionsCache.Set(instance.Addr, resp)
r.assignedPartitionsCache.Set(instance.Addr, resp)
responseCh <- getAssignedPartitionsResponse{
addr: instance.Addr,
response: resp,

@ -283,7 +283,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
}},
}, {
// When one instance returns an error, the streams for that instance
// are permitted.
// are failed.
name: "two streams, two instances, one instance returns error",
request: &proto.ExceedsLimitsRequest{
Tenant: "test",
@ -338,6 +338,11 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
StreamHash: 0x2,
Reason: uint32(limits.ReasonMaxStreams),
}},
}, {
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonFailed),
}},
}},
}, {
// When one zone returns an error, the streams for that instance

Loading…
Cancel
Save