From 60c76318e66d33a726b41d8542e71663e581c92a Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Mon, 2 Jun 2025 10:06:53 +0200 Subject: [PATCH] fix(limits): Read the consumer group and topic from the ingest-limits config (#17831) --- docs/sources/shared/configuration.md | 9 +++++++++ pkg/limits/config.go | 21 ++++++++++++++++++++ pkg/limits/frontend/ring.go | 6 ++---- pkg/limits/service.go | 11 ++++------ pkg/limits/service_test.go | 5 +++-- pkg/loki/loki.go | 12 ++++++----- tools/dev/kafka/docker-compose.yaml | 6 +++--- tools/dev/kafka/loki-local-config.debug.yaml | 5 +++++ tools/stream-generator/generator/kafka.go | 2 +- 9 files changed, 55 insertions(+), 22 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index dd515ee026..acb1eb05bc 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1069,6 +1069,15 @@ ingest_limits: # CLI flag: -ingest-limits.lifecycler.ID [id: | default = ""] + # The consumer group for the Kafka topic used to read stream metadata records. + # CLI flag: -ingest-limits.consumer-group + [consumer_group: | default = "ingest-limits"] + + # The topic for the Kafka topic used to read and write stream metadata + # records. + # CLI flag: -ingest-limits.topic + [topic: | default = ""] + ingest_limits_frontend: client_config: # Configures client gRPC connections to limits service. diff --git a/pkg/limits/config.go b/pkg/limits/config.go index 9396dca9ba..ba7bd69b3d 100644 --- a/pkg/limits/config.go +++ b/pkg/limits/config.go @@ -17,6 +17,7 @@ const ( DefaultBucketSize = 1 * time.Minute DefaultEvictInterval = 30 * time.Minute DefaultNumPartitions = 64 + DefaultConsumerGroup = "ingest-limits" ) // Config represents the configuration for the ingest limits service. @@ -49,6 +50,8 @@ type Config struct { // LifecyclerConfig is the config to build a ring lifecycler. LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` KafkaConfig kafka.Config `yaml:"-"` + ConsumerGroup string `yaml:"consumer_group"` + Topic string `yaml:"topic"` // Deprecated. WindowSize time.Duration `yaml:"window_size" doc:"hidden|deprecated"` @@ -93,6 +96,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { DefaultNumPartitions, "The number of partitions for the Kafka topic used to read and write stream metadata. It is fixed, not a maximum.", ) + f.StringVar( + &cfg.ConsumerGroup, + "ingest-limits.consumer-group", + DefaultConsumerGroup, + "The consumer group for the Kafka topic used to read stream metadata records.", + ) + f.StringVar( + &cfg.Topic, + "ingest-limits.topic", + "", + "The topic for the Kafka topic used to read and write stream metadata records.", + ) } func (cfg *Config) Validate() error { @@ -114,5 +129,11 @@ func (cfg *Config) Validate() error { if cfg.NumPartitions <= 0 { return errors.New("num-partitions must be greater than 0") } + if cfg.ConsumerGroup == "" { + return errors.New("consumer-group must be set") + } + if cfg.Topic == "" { + return errors.New("topic must be set") + } return nil } diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index f0ea84be8e..e26d8b5ad4 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -83,11 +83,9 @@ func (g *ringGatherer) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimi // Make a copy of the streams from the request. We will prune this slice // each time we receive the responses from a zone. streams := make([]*proto.StreamMetadata, 0, len(req.Streams)) - for _, stream := range req.Streams { - streams = append(streams, stream) - } + streams = append(streams, req.Streams...) // Query each zone as ordered in zonesToQuery. If a zone answers all - // streams, the request is satisifed and there is no need to query + // streams, the request is satisfied and there is no need to query // subsequent zones. If a zone answers just a subset of streams // (i.e. the instance that is consuming a partition is unavailable or the // partition that owns one or more streams does not have a consumer) diff --git a/pkg/limits/service.go b/pkg/limits/service.go index af710acace..7461b623e2 100644 --- a/pkg/limits/service.go +++ b/pkg/limits/service.go @@ -26,9 +26,6 @@ const ( // Ring RingKey = "ingest-limits" RingName = "ingest-limits" - - // Kafka - consumerGroup = "ingest-limits" ) // MetadataTopic returns the metadata topic name for the given topic. @@ -135,13 +132,13 @@ func New(cfg Config, lims Limits, logger log.Logger, reg prometheus.Registerer) // Create a copy of the config to modify the topic kCfg := cfg.KafkaConfig - kCfg.Topic = MetadataTopic(kCfg.Topic) + kCfg.Topic = cfg.Topic kCfg.AutoCreateTopicEnabled = true kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions offsetManager, err := partition.NewKafkaOffsetManager( kCfg, - "ingest-limits", + cfg.ConsumerGroup, logger, prometheus.NewRegistry(), ) @@ -157,8 +154,8 @@ func New(cfg Config, lims Limits, logger log.Logger, reg prometheus.Registerer) ) s.clientReader, err = client.NewReaderClient("ingest-limits-reader", kCfg, logger, reg, - kgo.ConsumerGroup(consumerGroup), - kgo.ConsumeTopics(kCfg.Topic), + kgo.ConsumerGroup(cfg.ConsumerGroup), + kgo.ConsumeTopics(cfg.Topic), kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(s.clock.Now().Add(-s.cfg.ActiveWindow).UnixMilli())), kgo.DisableAutoCommit(), diff --git a/pkg/limits/service_test.go b/pkg/limits/service_test.go index 71a5245391..590d31dac6 100644 --- a/pkg/limits/service_test.go +++ b/pkg/limits/service_test.go @@ -463,10 +463,11 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) { func TestNew(t *testing.T) { cfg := Config{ KafkaConfig: kafka.Config{ - Topic: "test-topic", WriteTimeout: 10 * time.Second, }, - ActiveWindow: time.Hour, + ConsumerGroup: "test-consumer-group", + Topic: "test-topic.metadata", + ActiveWindow: time.Hour, LifecyclerConfig: ring.LifecyclerConfig{ RingConfig: ring.Config{ KVStore: kv.Config{ diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 3f5fe47b3a..98e2e4922a 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -289,11 +289,13 @@ func (c *Config) Validate() error { if err := c.LimitsConfig.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid limits_config config")) } - if err := c.IngestLimits.Validate(); err != nil { - errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits config")) - } - if err := c.IngestLimitsFrontend.Validate(); err != nil { - errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend config")) + if c.IngestLimits.Enabled { + if err := c.IngestLimits.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits config")) + } + if err := c.IngestLimitsFrontend.Validate(); err != nil { + errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend config")) + } } if err := c.IngestLimitsFrontendClient.Validate(); err != nil { errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend_client config")) diff --git a/tools/dev/kafka/docker-compose.yaml b/tools/dev/kafka/docker-compose.yaml index 43771a8e99..4a0735cbeb 100644 --- a/tools/dev/kafka/docker-compose.yaml +++ b/tools/dev/kafka/docker-compose.yaml @@ -13,7 +13,7 @@ services: - ./provisioning:/etc/grafana/provisioning/ - ./data/grafana/:/var/lib/grafana/ extra_hosts: - - 'host.docker.internal:host-gateway' + - "host.docker.internal:host-gateway" kafka-ui: image: provectuslabs/kafka-ui:latest ports: @@ -25,7 +25,7 @@ services: depends_on: - broker extra_hosts: - - 'host.docker.internal:host-gateway' + - "host.docker.internal:host-gateway" broker: image: apache/kafka:latest hostname: broker @@ -49,5 +49,5 @@ services: KAFKA_LOG_DIRS: /tmp/kraft-combined-logs CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk generator: - image: ctovena/log-generator:latest + image: theperiklis/log-generator:latest command: -url http://host.docker.internal:3100/loki/api/v1/push diff --git a/tools/dev/kafka/loki-local-config.debug.yaml b/tools/dev/kafka/loki-local-config.debug.yaml index f68e43e9dc..8fbc0ba0ea 100644 --- a/tools/dev/kafka/loki-local-config.debug.yaml +++ b/tools/dev/kafka/loki-local-config.debug.yaml @@ -20,6 +20,10 @@ common: kafka_config: topic: "loki" + reader_config: + address: localhost:9092 + writer_config: + address: localhost:9092 querier: query_partition_ingesters: true @@ -28,6 +32,7 @@ ingest_limits: enabled: true active_window: 1h num_partitions: 1 + topic: "loki.metadata" lifecycler: ring: kvstore: diff --git a/tools/stream-generator/generator/kafka.go b/tools/stream-generator/generator/kafka.go index 1551939d61..865082b1df 100644 --- a/tools/stream-generator/generator/kafka.go +++ b/tools/stream-generator/generator/kafka.go @@ -107,7 +107,7 @@ func (s *Generator) sendStreamsToKafka(ctx context.Context, streams []distributo Key: []byte(tenant), Value: b, Partition: partitionID, - Topic: limits.MetadataTopic(s.cfg.Kafka.Topic), + Topic: s.cfg.Kafka.Topic, } // Send to Kafka produceResults := s.writer.ProduceSync(ctx, []*kgo.Record{metadataRecord})