chore: refactor how distributor checks ingest-limits (#17042)

pull/17048/head
George Robinson 3 months ago committed by GitHub
parent e90e0c41fb
commit ffa9656f7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 158
      pkg/distributor/distributor.go
  2. 65
      pkg/distributor/distributor_test.go
  3. 154
      pkg/distributor/ingest_limits.go
  4. 124
      pkg/distributor/ingest_limits_test.go

@ -2,10 +2,8 @@ package distributor
import (
"context"
"encoding/binary"
"flag"
"fmt"
"hash/fnv"
"math"
"net/http"
"runtime/pprof"
@ -198,9 +196,7 @@ type Distributor struct {
ingesterTaskWg sync.WaitGroup
// Will succeed usage tracker in future.
limitsFrontendRing ring.ReadRing
limitsFrontends *ring_client.Pool
limitsFailures prometheus.Counter
ingestLimits *ingestLimits
// kafka
kafkaWriter KafkaProducer
@ -256,6 +252,17 @@ func New(
}
limitsFrontendClientFactory := limits_frontend_client.NewPoolFactory(limitsFrontendCfg)
limitsFrontendClientPool := limits_frontend_client.NewPool(
limits_frontend.RingName,
limitsFrontendCfg.PoolConfig,
limitsFrontendRing,
limitsFrontendClientFactory,
logger,
)
limitsFrontendClient := newIngestLimitsFrontendRingClient(
limitsFrontendRing,
limitsFrontendClientPool,
)
// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
@ -336,10 +343,6 @@ func New(
Name: "distributor_push_structured_metadata_sanitized_total",
Help: "The total number of times we've had to sanitize structured metadata (names or values) at ingestion time per tenant.",
}, []string{"tenant"}),
limitsFailures: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_ingest_limits_failures_total",
Help: "The total number of failures checking per-tenant ingest limits.",
}),
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_kafka_appends_total",
@ -368,14 +371,7 @@ func New(
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
partitionRing: partitionRing,
limitsFrontendRing: limitsFrontendRing,
limitsFrontends: limits_frontend_client.NewPool(
limits_frontend.RingName,
limitsFrontendCfg.PoolConfig,
limitsFrontendRing,
limitsFrontendClientFactory,
logger,
),
ingestLimits: newIngestLimits(limitsFrontendClient, registerer),
numMetadataPartitions: numMetadataPartitions,
}
@ -726,15 +722,16 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
}
if d.cfg.IngestLimitsEnabled {
exceedsLimits, reasons, err := d.exceedsLimits(ctx, tenantID, streams, d.doExceedsLimitsRPC)
exceedsLimits, _, err := d.ingestLimits.ExceedsLimits(ctx, tenantID, streams)
if err != nil {
d.limitsFailures.Inc()
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
} else if exceedsLimits {
if d.cfg.IngestLimitsDryRunEnabled {
level.Debug(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID)
} else {
return nil, httpgrpc.Error(http.StatusBadRequest, strings.Join(reasons, ","))
// TODO(grobinson): This will be removed, as we only want to fail the request
// when specific limits are exceeded.
return nil, httpgrpc.Error(http.StatusBadRequest, "request exceeded limits")
}
}
}
@ -1185,127 +1182,6 @@ func (d *Distributor) sendStreams(task pushIngesterTask) {
}
}
// exceedsLimits returns true if the request exceeds the per-tenant limits,
// otherwise false. If the request does exceed per-tenant limits, a list of
// reasons are returned explaining which limits were exceeded. An error is
// returned if the limits could not be checked.
func (d *Distributor) exceedsLimits(
ctx context.Context,
tenantID string,
streams []KeyedStream,
doExceedsLimitsFn doExceedsLimitsFunc,
) (bool, []string, error) {
if !d.cfg.IngestLimitsEnabled {
return false, nil, nil
}
resp, err := doExceedsLimitsFn(ctx, tenantID, streams)
if err != nil {
return false, nil, err
}
if len(resp.Results) == 0 {
return false, nil, nil
}
// hashesToLabels memoizes the labels for a stream hash so we can add
// it to the reason.
hashesToLabels := make(map[uint64]string)
for _, s := range streams {
hashesToLabels[s.HashKeyNoShard] = s.Stream.Labels
}
reasons := make([]string, 0, len(resp.Results))
for _, result := range resp.Results {
reasons = append(reasons, fmt.Sprintf(
"stream %s was rejected because %q",
hashesToLabels[result.StreamHash],
result.Reason,
))
}
return true, reasons, nil
}
// doExceedsLimitsFunc enables stubbing out doExceedsLimitsRPC for tests.
type doExceedsLimitsFunc func(
ctx context.Context,
tenantID string,
streams []KeyedStream,
) (*logproto.ExceedsLimitsResponse, error)
// doExceedsLimitsRPC executes an RPC to the limits-frontend service to check
// if per-tenant limits have been exceeded. If an RPC call returns an error,
// it failsover to the next limits-frontend service. The failover is repeated
// until there are no more replicas remaining or the context is canceled,
// whichever happens first.
func (d *Distributor) doExceedsLimitsRPC(
ctx context.Context,
tenantID string,
streams []KeyedStream,
) (*logproto.ExceedsLimitsResponse, error) {
// We use an FNV-1 of all stream hashes in the request to load balance requests
// to limits-frontends instances.
h := fnv.New32()
// The distributor sends the hashes of all streams in the request to the
// limits-frontend. The limits-frontend is responsible for deciding if
// the request would exceed the tenants limits, and if so, which streams
// from the request caused it to exceed its limits.
streamMetadata := make([]*logproto.StreamMetadata, 0, len(streams))
for _, stream := range streams {
// Add the stream hash to FNV-1.
buf := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(buf, stream.HashKeyNoShard)
_, _ = h.Write(buf)
// Calculate the size of the stream.
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
// Add the stream hash to the request. This is sent to limits-frontend.
streamMetadata = append(streamMetadata, &logproto.StreamMetadata{
StreamHash: stream.HashKeyNoShard,
EntriesSize: entriesSize,
StructuredMetadataSize: structuredMetadataSize,
})
}
req := logproto.ExceedsLimitsRequest{
Tenant: tenantID,
Streams: streamMetadata,
}
// Get the limits-frontend instances from the ring.
var descs [5]ring.InstanceDesc
rs, err := d.limitsFrontendRing.Get(h.Sum32(), limits_frontend_client.LimitsRead, descs[0:], nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to get limits-frontend instances from ring: %w", err)
}
var lastErr error
// Send the request to the limits-frontend to see if it exceeds the tenant
// limits. If the RPC fails, failover to the next instance in the ring.
for _, instance := range rs.Instances {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
c, err := d.limitsFrontends.GetClientFor(instance.Addr)
if err != nil {
lastErr = err
continue
}
client := c.(logproto.IngestLimitsFrontendClient)
resp, err := client.ExceedsLimits(ctx, &req)
if err != nil {
lastErr = err
continue
}
return resp, nil
}
return nil, lastErr
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error {
c, err := d.ingesterClients.GetClientFor(ingester.Addr)

@ -2,7 +2,6 @@ package distributor
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
@ -2388,67 +2387,3 @@ func TestRequestScopedStreamResolver(t *testing.T) {
policy = newResolver.PolicyFor(labels.FromStrings("env", "dev"))
require.Equal(t, "policy1", policy)
}
func TestExceedsLimits(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
distributors, _ := prepare(t, 1, 0, limits, nil)
d := distributors[0]
ctx := context.Background()
streams := []KeyedStream{{
HashKeyNoShard: 1,
Stream: logproto.Stream{
Labels: "{foo=\"bar\"}",
},
}}
t.Run("no limits should be checked when disabled", func(t *testing.T) {
d.cfg.IngestLimitsEnabled = false
doExceedsLimitsFn := func(_ context.Context, _ string, _ []KeyedStream) (*logproto.ExceedsLimitsResponse, error) {
t.Fail() // Should not be called.
return nil, nil
}
exceedsLimits, reasons, err := d.exceedsLimits(ctx, "test", streams, doExceedsLimitsFn)
require.Nil(t, err)
require.False(t, exceedsLimits)
require.Nil(t, reasons)
})
t.Run("error should be returned if limits cannot be checked", func(t *testing.T) {
d.cfg.IngestLimitsEnabled = true
doExceedsLimitsFn := func(_ context.Context, _ string, _ []KeyedStream) (*logproto.ExceedsLimitsResponse, error) {
return nil, errors.New("failed to check limits")
}
exceedsLimits, reasons, err := d.exceedsLimits(ctx, "test", streams, doExceedsLimitsFn)
require.EqualError(t, err, "failed to check limits")
require.False(t, exceedsLimits)
require.Nil(t, reasons)
})
t.Run("stream exceeds limits", func(t *testing.T) {
doExceedsLimitsFn := func(_ context.Context, _ string, _ []KeyedStream) (*logproto.ExceedsLimitsResponse, error) {
return &logproto.ExceedsLimitsResponse{
Tenant: "test",
Results: []*logproto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: "test",
}},
}, nil
}
exceedsLimits, reasons, err := d.exceedsLimits(ctx, "test", streams, doExceedsLimitsFn)
require.Nil(t, err)
require.True(t, exceedsLimits)
require.Equal(t, []string{"stream {foo=\"bar\"} was rejected because \"test\""}, reasons)
})
t.Run("stream does not exceed limits", func(t *testing.T) {
doExceedsLimitsFn := func(_ context.Context, _ string, _ []KeyedStream) (*logproto.ExceedsLimitsResponse, error) {
return &logproto.ExceedsLimitsResponse{}, nil
}
exceedsLimits, reasons, err := d.exceedsLimits(ctx, "test", streams, doExceedsLimitsFn)
require.Nil(t, err)
require.False(t, exceedsLimits)
require.Nil(t, reasons)
})
}

@ -0,0 +1,154 @@
package distributor
import (
"context"
"encoding/binary"
"fmt"
"hash/fnv"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
"github.com/grafana/loki/v3/pkg/logproto"
)
// ingestLimitsFrontendClient is used for tests.
type ingestLimitsFrontendClient interface {
ExceedsLimits(context.Context, *logproto.ExceedsLimitsRequest) (*logproto.ExceedsLimitsResponse, error)
}
// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends.
type ingestLimitsFrontendRingClient struct {
ring ring.ReadRing
pool *ring_client.Pool
}
func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Pool) *ingestLimitsFrontendRingClient {
return &ingestLimitsFrontendRingClient{
ring: ring,
pool: pool,
}
}
// Implements the ingestLimitsFrontendClient interface.
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimitsRequest) (*logproto.ExceedsLimitsResponse, error) {
// We use an FNV-1 of all stream hashes in the request to load balance requests
// to limits-frontends instances.
h := fnv.New32()
for _, stream := range req.Streams {
// Add the stream hash to FNV-1.
buf := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(buf, stream.StreamHash)
_, _ = h.Write(buf)
}
// Get the limits-frontend instances from the ring.
var descs [5]ring.InstanceDesc
rs, err := c.ring.Get(h.Sum32(), limits_frontend_client.LimitsRead, descs[0:], nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to get limits-frontend instances from ring: %w", err)
}
var lastErr error
// Send the request to the limits-frontend to see if it exceeds the tenant
// limits. If the RPC fails, failover to the next instance in the ring.
for _, instance := range rs.Instances {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
c, err := c.pool.GetClientFor(instance.Addr)
if err != nil {
lastErr = err
continue
}
client := c.(logproto.IngestLimitsFrontendClient)
resp, err := client.ExceedsLimits(ctx, req)
if err != nil {
lastErr = err
continue
}
return resp, nil
}
return nil, lastErr
}
// exceedsIngestLimitsResult contains the reasons a stream exceeds per-tenant
// ingest limits.
type exceedsIngestLimitsResult struct {
hash uint64
reasons []string
}
type ingestLimits struct {
client ingestLimitsFrontendClient
limitsFailures prometheus.Counter
}
func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer) *ingestLimits {
return &ingestLimits{
client: client,
limitsFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_ingest_limits_failures_total",
Help: "The total number of failures checking ingest limits.",
}),
}
}
// ExceedsLimits returns true if at least one stream exceeds the per-tenant
// limits, otherwise false. It also returns a slice containing the streams
// that exceeded the per-tenant limits, and for each stream the reasons it
// exceeded the limits. This slice can be nil. An error is returned if the
// limits could not be checked.
func (l *ingestLimits) ExceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) (bool, []exceedsIngestLimitsResult, error) {
req, err := newExceedsLimitsRequest(tenant, streams)
if err != nil {
return false, nil, err
}
resp, err := l.client.ExceedsLimits(ctx, req)
if err != nil {
return false, nil, err
}
if len(resp.Results) == 0 {
return false, nil, nil
}
// A stream can exceed limits for multiple reasons. For example, exceeding
// both per-tenant stream limit and rate limits. We organize the reasons
// for each stream into a slice, and then add that to the results.
reasonsForHashes := make(map[uint64][]string)
for _, result := range resp.Results {
reasons := reasonsForHashes[result.StreamHash]
reasons = append(reasons, result.Reason)
reasonsForHashes[result.StreamHash] = reasons
}
result := make([]exceedsIngestLimitsResult, 0, len(reasonsForHashes))
for hash, reasons := range reasonsForHashes {
result = append(result, exceedsIngestLimitsResult{
hash: hash,
reasons: reasons,
})
}
return true, result, nil
}
func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*logproto.ExceedsLimitsRequest, error) {
// The distributor sends the hashes of all streams in the request to the
// limits-frontend. The limits-frontend is responsible for deciding if
// the request would exceed the tenants limits, and if so, which streams
// from the request caused it to exceed its limits.
streamMetadata := make([]*logproto.StreamMetadata, 0, len(streams))
for _, stream := range streams {
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
streamMetadata = append(streamMetadata, &logproto.StreamMetadata{
StreamHash: stream.HashKeyNoShard,
EntriesSize: entriesSize,
StructuredMetadataSize: structuredMetadataSize,
})
}
return &logproto.ExceedsLimitsRequest{
Tenant: tenant,
Streams: streamMetadata,
}, nil
}

@ -0,0 +1,124 @@
package distributor
import (
"context"
"errors"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
)
// mockIngestLimitsFrontendClient mocks the RPC calls for tests.
type mockIngestLimitsFrontendClient struct {
t *testing.T
expectedRequest *logproto.ExceedsLimitsRequest
response *logproto.ExceedsLimitsResponse
responseErr error
}
// Implements the ingestLimitsFrontendClient interface.
func (c *mockIngestLimitsFrontendClient) ExceedsLimits(_ context.Context, r *logproto.ExceedsLimitsRequest) (*logproto.ExceedsLimitsResponse, error) {
require.Equal(c.t, c.expectedRequest, r)
if c.responseErr != nil {
return nil, c.responseErr
}
return c.response, nil
}
// This test asserts that when checking ingest limits the expected proto
// message is sent, and that for a given response, the result contains the
// expected streams each with their expected reasons.
func TestIngestLimits_ExceedsLimits(t *testing.T) {
tests := []struct {
name string
tenant string
streams []KeyedStream
expectedRequest *logproto.ExceedsLimitsRequest
response *logproto.ExceedsLimitsResponse
responseErr error
expected []exceedsIngestLimitsResult
expectedErr string
}{{
name: "error should be returned if limits cannot be checked",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
}},
expectedRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{{
StreamHash: 1,
}},
},
responseErr: errors.New("failed to check limits"),
expectedErr: "failed to check limits",
}, {
name: "exceeds limits",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
}},
expectedRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{{
StreamHash: 1,
}},
},
response: &logproto.ExceedsLimitsResponse{
Tenant: "test",
Results: []*logproto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: "test",
}},
},
expected: []exceedsIngestLimitsResult{{
hash: 1,
reasons: []string{"test"},
}},
}, {
name: "does not exceed limits",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
}},
expectedRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{{
StreamHash: 1,
}},
},
response: &logproto.ExceedsLimitsResponse{
Tenant: "test",
Results: []*logproto.ExceedsLimitsResult{},
},
expected: nil,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
mockClient := mockIngestLimitsFrontendClient{
t: t,
expectedRequest: test.expectedRequest,
response: test.response,
responseErr: test.responseErr,
}
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
exceedsLimits, rejectedStreams, err := l.ExceedsLimits(ctx, test.tenant, test.streams)
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
require.False(t, exceedsLimits)
require.Empty(t, rejectedStreams)
} else {
require.Nil(t, err)
require.Equal(t, test.expected, rejectedStreams)
require.Equal(t, len(test.expected) > 0, exceedsLimits)
}
})
}
}
Loading…
Cancel
Save