From bc2111e8f10f3a73465c30a4ceffae39d87255d6 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Wed, 12 Mar 2025 13:43:38 +0000 Subject: [PATCH] feat: enforce limits in distributors (#16705) --- docs/sources/shared/configuration.md | 4 + pkg/distributor/distributor.go | 191 ++++++++++++++++++++++----- pkg/distributor/distributor_test.go | 14 +- pkg/loki/loki.go | 2 +- pkg/loki/modules.go | 3 + 5 files changed, 181 insertions(+), 33 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index a70d1c5bb6..a8a0a33e41 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2539,6 +2539,10 @@ otlp_config: # CLI flag: -distributor.ingester-writes-enabled [ingester_writes_enabled: | default = true] +# Enable checking limits against the ingest-limits service. Defaults to false. +# CLI flag: -distributor.ingest-limits-enabled +[ingest_limits_enabled: | default = false] + tenant_topic: # Enable the tenant topic tee, which writes logs to Kafka topics based on # tenant IDs instead of using multitenant topics/partitions. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d0210827ee..7fc704e10d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -2,8 +2,10 @@ package distributor import ( "context" + "encoding/binary" "flag" "fmt" + "hash/fnv" "math" "net/http" "runtime/pprof" @@ -44,9 +46,11 @@ import ( "github.com/grafana/loki/v3/pkg/distributor/shardstreams" "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester" - "github.com/grafana/loki/v3/pkg/ingester/client" + ingester_client "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/kafka" kafka_client "github.com/grafana/loki/v3/pkg/kafka/client" + limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend" + limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -96,9 +100,11 @@ type Config struct { OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"` - KafkaEnabled bool `yaml:"kafka_writes_enabled"` - IngesterEnabled bool `yaml:"ingester_writes_enabled"` - KafkaConfig kafka.Config `yaml:"-"` + KafkaEnabled bool `yaml:"kafka_writes_enabled"` + IngesterEnabled bool `yaml:"ingester_writes_enabled"` + IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"` + + KafkaConfig kafka.Config `yaml:"-"` // TODO: cleanup config TenantTopic TenantTopicConfig `yaml:"tenant_topic" category:"experimental"` @@ -114,6 +120,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.") fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.") fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.") + fs.BoolVar(&cfg.IngestLimitsEnabled, "distributor.ingest-limits-enabled", false, "Enable checking limits against the ingest-limits service. Defaults to false.") } func (cfg *Config) Validate() error { @@ -143,12 +150,12 @@ type Distributor struct { cfg Config ingesterCfg ingester.Config logger log.Logger - clientCfg client.Config + clientCfg ingester_client.Config tenantConfigs *runtime.TenantConfigs tenantsRetention *retention.TenantsRetention ingestersRing ring.ReadRing validator *Validator - pool *ring_client.Pool + ingesterClients *ring_client.Pool tee Tee rateStore RateStore @@ -184,10 +191,20 @@ type Distributor struct { ingesterTasks chan pushIngesterTask ingesterTaskWg sync.WaitGroup + // Will succeed usage tracker in future. + limitsFrontendRing ring.ReadRing + limitsFrontends *ring_client.Pool + // kafka kafkaWriter KafkaProducer partitionRing ring.PartitionRingReader + // The number of partitions for the stream metadata topic. Unlike stream + // records, where entries are sharded over just the active partitions, + // stream metadata is sharded over all partitions, and all partitions + // are consumed. + numMetadataPartitions int + // kafka metrics kafkaAppends *prometheus.CounterVec kafkaWriteBytesTotal prometheus.Counter @@ -199,7 +216,7 @@ type Distributor struct { func New( cfg Config, ingesterCfg ingester.Config, - clientCfg client.Config, + clientCfg ingester_client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, partitionRing ring.PartitionRingReader, @@ -208,19 +225,22 @@ func New( metricsNamespace string, tee Tee, usageTracker push.UsageTracker, + limitsFrontendCfg limits_frontend_client.Config, + limitsFrontendRing ring.ReadRing, + numMetadataPartitions int, logger log.Logger, ) (*Distributor, error) { - factory := cfg.factory - if factory == nil { - factory = ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) { - return client.New(clientCfg, addr) + ingesterClientFactory := cfg.factory + if ingesterClientFactory == nil { + ingesterClientFactory = ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) { + return ingester_client.New(clientCfg, addr) }) } - internalFactory := func(addr string) (ring_client.PoolClient, error) { + internalIngesterClientFactory := func(addr string) (ring_client.PoolClient, error) { internalCfg := clientCfg internalCfg.Internal = true - return client.New(internalCfg, addr) + return ingester_client.New(internalCfg, addr) } validator, err := NewValidator(overrides, usageTracker) @@ -228,6 +248,8 @@ func New( return nil, err } + limitsFrontendClientFactory := limits_frontend_client.NewPoolFactory(limitsFrontendCfg) + // Create the configured ingestion rate limit strategy (local or global). var ingestionRateStrategy limiter.RateLimiterStrategy var distributorsLifecycler *ring.BasicLifecycler @@ -274,7 +296,7 @@ func New( tenantsRetention: retention.NewTenantsRetention(overrides), ingestersRing: ingestersRing, validator: validator, - pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, factory, logger, metricsNamespace), + ingesterClients: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, ingesterClientFactory, logger, metricsNamespace), labelCache: labelCache, shardTracker: NewShardTracker(), healthyInstancesCount: atomic.NewUint32(0), @@ -335,6 +357,15 @@ func New( writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"), kafkaWriter: kafkaWriter, partitionRing: partitionRing, + limitsFrontendRing: limitsFrontendRing, + limitsFrontends: limits_frontend_client.NewPool( + limits_frontend.RingName, + limitsFrontendCfg.PoolConfig, + limitsFrontendRing, + limitsFrontendClientFactory, + logger, + ), + numMetadataPartitions: numMetadataPartitions, } if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { @@ -366,7 +397,7 @@ func New( "rate-store", clientCfg.PoolConfig, ingestersRing, - ring_client.PoolAddrFunc(internalFactory), + ring_client.PoolAddrFunc(internalIngesterClientFactory), logger, metricsNamespace, ), @@ -375,7 +406,7 @@ func New( ) d.rateStore = rs - servs = append(servs, d.pool, rs) + servs = append(servs, d.ingesterClients, rs) d.subservices, err = services.NewManager(servs...) if err != nil { return nil, errors.Wrap(err, "services manager") @@ -417,8 +448,9 @@ func (d *Distributor) stopping(_ error) error { } type KeyedStream struct { - HashKey uint32 - Stream logproto.Stream + HashKey uint32 + HashKeyNoShard uint64 + Stream logproto.Stream } // TODO taken from Cortex, see if we can refactor out an usable interface. @@ -474,6 +506,17 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe return &logproto.PushResponse{}, httpgrpc.Errorf(http.StatusUnprocessableEntity, validation.MissingStreamsErrorMsg) } + if d.cfg.IngestLimitsEnabled { + exceedsLimits, err := d.exceedsLimits(ctx, tenantID, req.Streams) + if err != nil { + level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err) + } else if len(exceedsLimits.RejectedStreams) > 0 { + level.Error(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID) + } else { + level.Debug(d.logger).Log("msg", "request accepted", "tenant", tenantID) + } + } + // First we flatten out the request into a list of samples. // We use the heuristic of 1 sample per TS to size the array. // We also work out the hash value at the same time. @@ -494,8 +537,9 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe return } streams = append(streams, KeyedStream{ - HashKey: lokiring.TokenFor(tenantID, stream.Labels), - Stream: stream, + HashKey: lokiring.TokenFor(tenantID, stream.Labels), + HashKeyNoShard: stream.Hash, + Stream: stream, }) } @@ -932,7 +976,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID shardCount := d.shardCountFor(logger, &stream, pushSize, tenantID, shardStreamsCfg) if shardCount <= 1 { - return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), Stream: stream}} + return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), HashKeyNoShard: stream.Hash, Stream: stream}} } d.streamShardCount.Inc() @@ -976,8 +1020,9 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena shard := d.createShard(streamLabels, streamPattern, shardNum, entriesPerShard) derivedStreams = append(derivedStreams, KeyedStream{ - HashKey: lokiring.TokenFor(tenantID, shard.Labels), - Stream: shard, + HashKey: lokiring.TokenFor(tenantID, shard.Labels), + HashKeyNoShard: stream.Hash, + Stream: shard, }) if shardStreamsCfg.LoggingEnabled { @@ -1107,9 +1152,65 @@ func (d *Distributor) sendStreams(task pushIngesterTask) { } } +func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, streams []logproto.Stream) (*logproto.ExceedsLimitsResponse, error) { + // We use an FNV-1 of all stream hashes in the request to load balance requests + // to limits-frontends instances. + h := fnv.New32() + + // The distributor sends the hashes of all streams in the request to the + // limits-frontend. The limits-frontend is responsible for deciding if + // the request would exceed the tenants limits, and if so, which streams + // from the request caused it to exceed its limits. + streamHashes := make([]*logproto.StreamMetadata, 0, len(streams)) + for _, stream := range streams { + // Add the stream hash to FNV-1. + buf := make([]byte, binary.MaxVarintLen64) + binary.PutUvarint(buf, stream.Hash) + _, _ = h.Write(buf) + // Add the stream hash to the request. This is sent to limits-frontend. + streamHashes = append(streamHashes, &logproto.StreamMetadata{ + StreamHash: stream.Hash, + }) + } + + req := logproto.ExceedsLimitsRequest{ + Tenant: tenantID, + Streams: streamHashes, + } + + // Get the limits-frontend instances from the ring. + var descs [5]ring.InstanceDesc + rs, err := d.limitsFrontendRing.Get(h.Sum32(), limits_frontend_client.LimitsRead, descs[0:], nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to get limits-frontend instances from ring: %w", err) + } + + var lastErr error + // Send the request to the limits-frontend to see if it exceeds the tenant + // limits. If the RPC fails, failover to the next instance in the ring. + for _, instance := range rs.Instances { + c, err := d.limitsFrontends.GetClientFor(instance.Addr) + if err != nil { + lastErr = err + continue + } + + client := c.(logproto.IngestLimitsFrontendClient) + resp, err := client.ExceedsLimits(ctx, &req) + if err != nil { + lastErr = err + continue + } + + return resp, nil + } + + return nil, lastErr +} + // TODO taken from Cortex, see if we can refactor out an usable interface. func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error { - c, err := d.pool.GetClientFor(ingester.Addr) + c, err := d.ingesterClients.GetClientFor(ingester.Addr) if err != nil { return err } @@ -1150,20 +1251,48 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, if len(stream.Stream.Entries) == 0 { return nil } - partitionID, err := subring.ActivePartitionForKey(stream.HashKey) + + // The distributor writes stream records to one of the active partitions + // in the partition ring. The number of active partitions is equal to the + // number of ingesters. + streamPartitionID, err := subring.ActivePartitionForKey(stream.HashKey) if err != nil { d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() return fmt.Errorf("failed to find active partition for stream: %w", err) } - startTime := time.Now() - - records, err := kafka.Encode(partitionID, tenant, stream.Stream, d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes) + records, err := kafka.Encode( + streamPartitionID, + tenant, + stream.Stream, + d.cfg.KafkaConfig.ProducerMaxRecordSizeBytes, + ) if err != nil { - d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + d.kafkaAppends.WithLabelValues( + fmt.Sprintf("partition_%d", streamPartitionID), + "fail", + ).Inc() return fmt.Errorf("failed to marshal write request to records: %w", err) } + // However, unlike stream records, the distributor writes stream metadata + // records to one of a fixed number of partitions, the size of which is + // determined ahead of time. It does not use a ring. The reason for this + // is that we want to be able to scale components that consume metadata + // records independent of ingesters. + metadataPartitionID := int32(stream.HashKeyNoShard % uint64(d.numMetadataPartitions)) + metadata, err := kafka.EncodeStreamMetadata( + metadataPartitionID, + d.cfg.KafkaConfig.Topic, + tenant, + stream.HashKeyNoShard, + ) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + records = append(records, metadata) + d.kafkaRecordsPerRequest.Observe(float64(len(records))) produceResults := d.kafkaWriter.ProduceSync(ctx, records) @@ -1176,10 +1305,10 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, var finalErr error for _, result := range produceResults { if result.Err != nil { - d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", streamPartitionID), "fail").Inc() finalErr = result.Err } else { - d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "success").Inc() + d.kafkaAppends.WithLabelValues(fmt.Sprintf("partition_%d", streamPartitionID), "success").Inc() } } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 70277c866b..545e781328 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -39,6 +39,8 @@ import ( "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" + limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend" + limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client" loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -1891,6 +1893,15 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation ring: partitionRing, } + limitsFrontendRing, err := ring.New(ring.Config{ + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 60 * time.Minute, + ReplicationFactor: 1, + }, limits_frontend.RingKey, limits_frontend.RingKey, nil, nil) + require.NoError(t, err) + loopbackName, err := loki_net.LoopbackInterfaceName() require.NoError(t, err) @@ -1917,8 +1928,9 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation require.NoError(t, err) ingesterConfig := ingester.Config{MaxChunkAge: 2 * time.Hour} + limitsFrontendCfg := limits_frontend_client.Config{} - d, err := New(distributorConfig, ingesterConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) + d, err := New(distributorConfig, ingesterConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, partitionRingReader, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, limitsFrontendCfg, limitsFrontendRing, 1, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) distributors[i] = d diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index c17e316420..54f2da007d 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -780,7 +780,7 @@ func (t *Loki) setupModuleManager() error { OverridesExporter: {Overrides, Server, UI}, TenantConfigs: {RuntimeConfig}, UI: {Server}, - Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, UI}, + Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, IngestLimitsFrontendRing, UI}, IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV}, IngestLimits: {MemberlistKV, Server}, IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server, MemberlistKV}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index aff20caa20..b76a7c0a8d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -370,6 +370,9 @@ func (t *Loki) initDistributor() (services.Service, error) { t.Cfg.MetricsNamespace, t.Tee, t.UsageTracker, + t.Cfg.IngestLimitsFrontendClient, + t.ingestLimitsFrontendRing, + t.Cfg.IngestLimits.NumPartitions, logger, ) if err != nil {