diff --git a/cmd/loki/dataobj-inmemory-config.yaml b/cmd/loki/dataobj-inmemory-config.yaml new file mode 100644 index 0000000000..e5ed5ce00e --- /dev/null +++ b/cmd/loki/dataobj-inmemory-config.yaml @@ -0,0 +1,117 @@ +# Single-binary Loki with dataobjects, no Kafka. +# Writes go to the in-process stream sink -> dataobj consumer -> object storage. +# No ingester/chunk writes; query via v2 engine (dataobjects). + +auth_enabled: false + +server: + http_listen_port: 3100 + grpc_listen_port: 9096 + log_level: info + grpc_server_max_concurrent_streams: 1000 + +common: + instance_addr: 127.0.0.1 + path_prefix: /tmp/loki + scratch_path: /tmp/loki/scratch + storage: + filesystem: + chunks_directory: /tmp/loki/chunks + rules_directory: /tmp/loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + +# Dataobj: in-memory ingest (no Kafka) +dataobj: + enabled: true + storage_bucket_prefix: "dataobj/" + consumer: + ingest_mode: inmemory + idle_flush_timeout: 2m + max_builder_age: 5m + # Optional: inmemory_channel_capacity: 10000, inmemory_push_timeout: 5s + metastore: + index_storage_prefix: "index/v0" + partition_ratio: 10 + +# No Kafka +# kafka_config is omitted; not used when ingest_mode=inmemory + +# Querier +querier: + query_partition_ingesters: false + +# V2 query engine: use dataobjects for all data (storage_lag=0 so no cutoff to chunks) +query_engine: + enable: true + enable_engine_router: false + storage_lag: 0 + +# Ingest limits off (they use Kafka) +ingest_limits: + enabled: false + +ingest_limits_frontend: + num_partitions: 1 + lifecycler: + ring: + kvstore: + store: inmemory + replication_factor: 1 + +# Ingester not used for writes (distributor sends only to dataobj stream sink) +ingester: + wal: + enabled: false + kafka_ingestion: + enabled: false + lifecycler: + id: "loki.local" + +# Distributor: only dataobj in-memory sink (no Kafka, no ingester) +distributor: + kafka_writes_enabled: false + ingester_writes_enabled: false + ingest_limits_enabled: false + ingest_limits_dry_run_enabled: false + +# Query range +query_range: + cache_volume_results: false + results_cache: + cache: + embedded_cache: + enabled: false + max_size_mb: 100 + +limits_config: + metric_aggregation_enabled: true + max_global_streams_per_user: 1000 + +# Schema: TSDB + filesystem object store (dataobj uses this for objects/index) +schema_config: + configs: + - from: 2020-10-24 + store: tsdb + object_store: filesystem + schema: v13 + index: + prefix: index_ + period: 24h + +storage_config: + object_store: + filesystem: + dir: /tmp/loki + +# Disabled to avoid internal push load while testing single-binary dataobj +pattern_ingester: + enabled: false + +ruler: + alertmanager_url: http://localhost:9093 + +frontend: + encoding: protobuf diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index ce3ce23811..80e435bb7a 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1522,6 +1522,16 @@ dataobj: # CLI flag: -dataobj-consumer.max-builder-age [max_builder_age: | default = 1h] + # How records are ingested: "kafka" reads from a Kafka topic; "inmemory" + # uses an in-process channel (experimental, single-node, no durability + # guarantees, each replica holds independent data). + # CLI flag: -dataobj-consumer.ingest-mode + [ingest_mode: | default = "kafka"] + + # Internal buffer size for records for inmemory ingestion. + # CLI flag: -dataobj-consumer.channel-size + [channel_size: | default = 10000] + # The name of the Kafka topic. # CLI flag: -dataobj-consumer.topic [topic: | default = ""] @@ -3336,6 +3346,11 @@ dataobj_tee: # to 0 to disable batching. # CLI flag: -distributor.dataobj-tee.rate-batch-window [rate_batch_window: | default = 0s] + +# Timeout for sending a record to the in-memory queue before returning +# backpressure to the caller. Defaults to 5s. Set to 0 for no timeout. +# CLI flag: -distributor.inmemory-dataobj-push-timeout +[inmemory_dataobj_push_timeout: | default = 5s] ``` ### etcd diff --git a/integration/client/client.go b/integration/client/client.go index fe32f26eaa..0d4ce79035 100644 --- a/integration/client/client.go +++ b/integration/client/client.go @@ -254,6 +254,25 @@ func (c *Client) Metrics() (string, error) { return sb.String(), nil } +// FlushDataobj triggers a flush of the in-memory dataobj consumer. +func (c *Client) FlushDataobj() error { + req, err := c.request(context.Background(), "POST", fmt.Sprintf("%s/dataobj-consumer/flush", c.baseURL)) + if err != nil { + return err + } + + res, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode/100 == 2 { + return nil + } + return fmt.Errorf("request failed with status code %d", res.StatusCode) +} + // Flush all in-memory chunks held by the ingesters to the backing store func (c *Client) Flush() error { req, err := c.request(context.Background(), "POST", fmt.Sprintf("%s/flush", c.baseURL)) diff --git a/integration/loki_dataobj_inmemory_pipeline_test.go b/integration/loki_dataobj_inmemory_pipeline_test.go new file mode 100644 index 0000000000..aed316546f --- /dev/null +++ b/integration/loki_dataobj_inmemory_pipeline_test.go @@ -0,0 +1,111 @@ +//go:build integration + +package integration + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/integration/client" + "github.com/grafana/loki/v3/integration/cluster" +) + +func TestInmemoryPipeline(t *testing.T) { + clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) { + c.SetSchemaVer("v13") + }) + defer func() { + assert.NoError(t, clu.Cleanup()) + }() + + tAll := clu.AddComponent( + "all", + "-target=all", + "-dataobj.enabled=true", + "-dataobj-consumer.ingest-mode=inmemory", + "-dataobj-consumer.target-page-size=2KB", + "-dataobj-consumer.target-builder-memory-limit=1MB", + "-dataobj-consumer.buffer-size=256KB", + "-dataobj-consumer.target-section-size=512KB", + "-dataobj-consumer.section-stripe-merge-limit=2", + "-dataobj-consumer.sha-prefix-size=2", + "-dataobj-consumer.idle-flush-timeout=100ms", + "-distributor.ingester-writes-enabled=false", + "-pattern-ingester.enabled=false", + "-query-engine.enable=true", + "-query-engine.storage-lag=0", + ) + scratchPath := tAll.ClusterSharedPath() + "/scratch" + require.NoError(t, os.MkdirAll(scratchPath, 0o755)) + tAll.WithExtraConfig(fmt.Sprintf(` +common: + scratch_path: %s + ring: + kvstore: + store: inmemory +ingest_limits: + enabled: false +`, scratchPath)) + + require.NoError(t, clu.Run()) + + tenantID := randStringRunes() + cli := client.New(tenantID, "", tAll.HTTPURL()) + cli.Now = time.Now().Add(-30 * time.Second) + + t.Run("round-trip", func(t *testing.T) { + require.NoError(t, cli.PushLogLine("pipeline-line-1", cli.Now, nil, map[string]string{"job": "pipeline-test"})) + require.NoError(t, cli.PushLogLine("pipeline-line-2", cli.Now.Add(-time.Second), nil, map[string]string{"job": "pipeline-test"})) + + require.NoError(t, cli.FlushDataobj()) + + var lines []string + require.Eventually(t, func() bool { + qresp, err := cli.RunRangeQuery(context.Background(), `{job="pipeline-test"}`) + if err != nil { + return false + } + lines = nil + for _, stream := range qresp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + return len(lines) >= 2 + }, 30*time.Second, 100*time.Millisecond, "pushed logs never appeared in query results") + + assert.ElementsMatch(t, []string{"pipeline-line-1", "pipeline-line-2"}, lines) + }) + + t.Run("idle-flush-timeout", func(t *testing.T) { + tenantIdle := randStringRunes() + cliIdle := client.New(tenantIdle, "", tAll.HTTPURL()) + cliIdle.Now = time.Now() + + require.NoError(t, cliIdle.PushLogLine("idle-flush-line", cliIdle.Now, nil, map[string]string{"job": "idle-test"})) + + // Do NOT call the flush endpoint — let the idle flush timeout (100ms) trigger naturally. + var lines []string + require.Eventually(t, func() bool { + qresp, err := cliIdle.RunRangeQuery(context.Background(), `{job="idle-test"}`) + if err != nil { + return false + } + lines = nil + for _, stream := range qresp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + return len(lines) >= 1 + }, 15*time.Second, 200*time.Millisecond, "idle flush did not trigger within timeout") + + assert.Contains(t, lines, "idle-flush-line") + }) +} diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go index 34d78a251a..1644e1bfcf 100644 --- a/pkg/dataobj/consumer/config.go +++ b/pkg/dataobj/consumer/config.go @@ -3,6 +3,7 @@ package consumer import ( "errors" "flag" + "fmt" "time" "github.com/grafana/dskit/ring" @@ -13,6 +14,16 @@ import ( util_log "github.com/grafana/loki/v3/pkg/util/log" ) +// IngestMode determines how the consumer receives records. +type IngestMode string + +const ( + // IngestModeKafka reads records from a Kafka topic (default). + IngestModeKafka IngestMode = "kafka" + // IngestModeInMemory receives records via an in-process Go channel (no Kafka required). + IngestModeInMemory IngestMode = "inmemory" +) + type Config struct { logsobj.BuilderConfig LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` @@ -21,6 +32,14 @@ type Config struct { IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"` MaxBuilderAge time.Duration `yaml:"max_builder_age"` + // IngestMode controls how records are ingested. "kafka" (default) reads from + // a Kafka topic; "inmemory" uses an in-process channel (no Kafka needed). + IngestMode IngestMode `yaml:"ingest_mode"` + + // ChannelSize is the capacity of the buffered channel used to pass records + // from the distributor to the consumer in inmemory mode. + ChannelSize int `yaml:"channel_size"` + // This is temporary until we move to kafkav2. Topic string `yaml:"topic"` } @@ -35,8 +54,15 @@ func (cfg *Config) Validate() error { if err := cfg.UploaderConfig.Validate(); err != nil { return err } - if cfg.Topic == "" { - return errors.New("topic is required") + switch cfg.IngestMode { + case IngestModeKafka, "": // empty defaults to kafka + if cfg.Topic == "" { + return errors.New("topic is required") + } + case IngestModeInMemory: + // topic not required in inmemory mode + default: + return fmt.Errorf("unknown ingest_mode %q: must be %q or %q", cfg.IngestMode, IngestModeKafka, IngestModeInMemory) } return nil } @@ -50,6 +76,18 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix, f, util_log.Logger) cfg.PartitionRingConfig.RegisterFlagsWithPrefix(prefix, f) cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f) + f.StringVar( + (*string)(&cfg.IngestMode), + prefix+"ingest-mode", + string(IngestModeKafka), + `How records are ingested: "kafka" reads from a Kafka topic; "inmemory" uses an in-process channel (experimental, single-node, no durability guarantees, each replica holds independent data).`, + ) + f.IntVar( + &cfg.ChannelSize, + prefix+"channel-size", + 10000, + `Internal buffer size for records for inmemory ingestion.`, + ) f.StringVar( &cfg.Topic, prefix+"topic", diff --git a/pkg/dataobj/consumer/http.go b/pkg/dataobj/consumer/http.go index 571f97cd01..8406cf2024 100644 --- a/pkg/dataobj/consumer/http.go +++ b/pkg/dataobj/consumer/http.go @@ -100,6 +100,22 @@ func (s *Service) cancelDelayedDownscale(w http.ResponseWriter, r *http.Request) s.respondWithCurrentPartitionState(w, r) } +// FlushHandler triggers an immediate flush of any in-flight builders. +// Used in testing and operational tooling to force data into object storage +// without waiting for idle or max-age timeouts. +func (s *Service) FlushHandler(w http.ResponseWriter, r *http.Request) { + if s.processor == nil || s.State() != services.Running { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + if err := s.processor.Flush(r.Context()); err != nil { + level.Error(s.logger).Log("msg", "flush failed", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} + func (s *Service) respondWithCurrentPartitionState(w http.ResponseWriter, r *http.Request) { state, stateTimestamp, err := s.partitionInstanceLifecycler.GetPartitionState(r.Context()) if err != nil { diff --git a/pkg/dataobj/consumer/mock_test.go b/pkg/dataobj/consumer/mock_test.go index 74a69c04dc..29de9ff26b 100644 --- a/pkg/dataobj/consumer/mock_test.go +++ b/pkg/dataobj/consumer/mock_test.go @@ -50,8 +50,21 @@ func (m *mockBucket) Get(_ context.Context, name string) (io.ReadCloser, error) return io.NopCloser(bytes.NewReader(data)), nil } -func (m *mockBucket) GetRange(_ context.Context, _ string, _, _ int64) (io.ReadCloser, error) { - return nil, nil +func (m *mockBucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) { + m.mu.Lock() + defer m.mu.Unlock() + data, exists := m.uploads[name] + if !exists { + return nil, errors.New("object not found") + } + if off < 0 || off > int64(len(data)) { + return nil, errors.New("offset out of range") + } + end := off + length + if end > int64(len(data)) { + end = int64(len(data)) + } + return io.NopCloser(bytes.NewReader(data[off:end])), nil } func (m *mockBucket) Upload(_ context.Context, name string, r io.Reader) error { @@ -69,12 +82,37 @@ func (m *mockBucket) Iter(_ context.Context, _ string, _ func(string) error, _ . return nil } func (m *mockBucket) Name() string { return "mock" } -func (m *mockBucket) Attributes(_ context.Context, _ string) (objstore.ObjectAttributes, error) { - return objstore.ObjectAttributes{}, nil +func (m *mockBucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) { + m.mu.Lock() + defer m.mu.Unlock() + data, exists := m.uploads[name] + if !exists { + return objstore.ObjectAttributes{}, errors.New("object not found") + } + return objstore.ObjectAttributes{Size: int64(len(data))}, nil } -func (m *mockBucket) GetAndReplace(_ context.Context, name string, _ func(io.ReadCloser) (io.ReadCloser, error)) error { - return m.Upload(context.Background(), name, io.NopCloser(bytes.NewReader([]byte{}))) +func (m *mockBucket) GetAndReplace(_ context.Context, name string, fn func(io.ReadCloser) (io.ReadCloser, error)) error { + m.mu.Lock() + var existing io.ReadCloser + if data, ok := m.uploads[name]; ok { + existing = io.NopCloser(bytes.NewReader(data)) + } + m.mu.Unlock() + + newRC, err := fn(existing) + if err != nil { + return err + } + defer newRC.Close() + data, err := io.ReadAll(newRC) + if err != nil { + return err + } + m.mu.Lock() + m.uploads[name] = data + m.mu.Unlock() + return nil } func (m *mockBucket) IsAccessDeniedErr(_ error) bool { diff --git a/pkg/dataobj/consumer/processor.go b/pkg/dataobj/consumer/processor.go index 56d4df00ad..814cf735f1 100644 --- a/pkg/dataobj/consumer/processor.go +++ b/pkg/dataobj/consumer/processor.go @@ -35,12 +35,19 @@ type flushCommitter interface { } // A processor receives records and builds data objects from them. +// flushRequest is used to send a flush request to the processor's Run loop. +type flushRequest struct { + done chan<- error +} + type processor struct { *services.BasicService builder builder decoder *kafka.Decoder records chan *kgo.Record flushCommitter flushCommitter + // flushRequests is used to safely trigger a flush from outside the Run loop. + flushRequests chan flushRequest // lastOffset contains the offset of the last record appended to the data object // builder. It is used to commit the correct offset after a flush. @@ -99,6 +106,7 @@ func newProcessor( decoder: decoder, records: records, flushCommitter: flushCommitter, + flushRequests: make(chan flushRequest, 1), idleFlushTimeout: idleFlushTimeout, maxBuilderAge: maxBuilderAge, metrics: newMetrics(reg), @@ -119,8 +127,53 @@ func (p *processor) running(ctx context.Context) error { return p.Run(ctx) } -// stopping implements [services.StoppingFn]. +// stopping implements [services.StoppingFn]. It drains any buffered records +// from the in-process channel (up to a 30s timeout) before returning, then +// flushes any accumulated data. This ensures that records buffered at SIGTERM +// are not silently lost in inmemory mode. +// +// Note: stopping() is called after Run() returns (dskit guarantees +// RunningFn happens-before StoppingFn), so there is no race with the run loop. +// The records channel remains open (owned by Service) and may still have +// buffered records written by the distributor before the push timeout fired. func (p *processor) stopping(_ error) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + dropped := 0 +drain: + for { + select { + case rec, ok := <-p.records: + if !ok { + break drain + } + if err := p.processRecord(ctx, rec); err != nil { + level.Error(p.logger).Log("msg", "failed to process record during shutdown drain", "err", err) + p.observeRecordErr(rec) + } + case <-ctx.Done(): + // Drain timed out — count remaining buffered records as dropped. + dropped = len(p.records) + break drain + default: + // Channel is empty — drain complete. + break drain + } + } + + if dropped > 0 { + level.Warn(p.logger).Log("msg", "inmemory drain timed out, records dropped", "count", dropped) + } else { + level.Info(p.logger).Log("msg", "inmemory channel drained cleanly on shutdown") + } + + // Flush whatever was accumulated during drain. + if !p.lastAppend.IsZero() && p.builder.GetEstimatedSize() > 0 { + if err := p.flush(ctx, "shutdown"); err != nil { + level.Error(p.logger).Log("msg", "failed to flush during shutdown drain", "err", err) + } + } return nil } @@ -149,6 +202,29 @@ func (p *processor) Run(ctx context.Context) error { if _, err := p.idleFlush(ctx); err != nil { level.Error(p.logger).Log("msg", "failed to idle flush", "err", err) } + case req := <-p.flushRequests: + // Drain any records that are already in the channel before flushing + // so that all pending data is included in the flush. + drain: + for { + select { + case rec, ok := <-p.records: + if !ok { + break drain + } + if err := p.processRecord(ctx, rec); err != nil { + level.Error(p.logger).Log("msg", "failed to process record during flush drain", "err", err) + p.observeRecordErr(rec) + } + default: + break drain + } + } + var err error + if !p.lastAppend.IsZero() && p.builder.GetEstimatedSize() > 0 { + err = p.flush(ctx, flushReasonIdle) + } + req.done <- err } } } @@ -222,6 +298,23 @@ func (p *processor) idleFlush(ctx context.Context) (bool, error) { return true, nil } +// Flush triggers an immediate flush via the processor's Run loop, draining any +// pending records from the channel first. Safe to call from any goroutine. +func (p *processor) Flush(ctx context.Context) error { + done := make(chan error, 1) + select { + case p.flushRequests <- flushRequest{done: done}: + case <-ctx.Done(): + return ctx.Err() + } + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + // needsIdleFlush returns true if the partition has exceeded the idle timeout // and the builder has some data buffered. func (p *processor) needsIdleFlush() bool { diff --git a/pkg/dataobj/consumer/processor_drain_test.go b/pkg/dataobj/consumer/processor_drain_test.go new file mode 100644 index 0000000000..3473f66e7e --- /dev/null +++ b/pkg/dataobj/consumer/processor_drain_test.go @@ -0,0 +1,58 @@ +package consumer + +import ( + "bytes" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" +) + +// TestProcessor_Stopping_Drain verifies that processor.stopping() drains +// buffered records from the channel and logs the correct outcome. +func TestProcessor_Stopping_Drain(t *testing.T) { + t.Run("empty channel logs clean drain", func(t *testing.T) { + var buf bytes.Buffer + logger := log.NewLogfmtLogger(&buf) + + reg := prometheus.NewRegistry() + builder := newTestBuilder(t, reg) + fc := &mockFlushCommitter{} + ch := make(chan *kgo.Record, 10) + + proc := newProcessor(builder, ch, fc, time.Hour, time.Hour, logger, reg) + + err := proc.stopping(nil) + require.NoError(t, err) + require.Contains(t, buf.String(), "inmemory channel drained cleanly on shutdown") + }) + + t.Run("records drained and logged clean", func(t *testing.T) { + var buf bytes.Buffer + logger := log.NewLogfmtLogger(&buf) + + reg := prometheus.NewRegistry() + builder := newTestBuilder(t, reg) + fc := &mockFlushCommitter{} + ch := make(chan *kgo.Record, 10) + + // Pre-fill 3 records into channel. + now := time.Now() + for i := 0; i < 3; i++ { + ch <- newTestRecord(t, "tenant1", now.Add(time.Duration(i)*time.Second)) + } + + proc := newProcessor(builder, ch, fc, time.Hour, time.Hour, logger, reg) + + err := proc.stopping(nil) + require.NoError(t, err) + require.True(t, + strings.Contains(buf.String(), "inmemory channel drained cleanly on shutdown"), + "expected clean drain log, got: %s", buf.String(), + ) + }) +} diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index bb6cfa619c..ed615a16bf 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -1,6 +1,7 @@ package consumer import ( + "bytes" "context" "fmt" "strconv" @@ -17,7 +18,10 @@ import ( "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" + "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" + dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index" + "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore" dataobj_uploader "github.com/grafana/loki/v3/pkg/dataobj/uploader" "github.com/grafana/loki/v3/pkg/kafka" @@ -49,6 +53,213 @@ type Service struct { watcher *services.FailureWatcher logger log.Logger reg prometheus.Registerer + + // recordsChan is set only in inmemory mode. Callers send *kgo.Record values + // here instead of through Kafka. + recordsChan chan *kgo.Record +} + +// noopCommitter is a committer that does nothing. Used in inmemory mode where +// there is no Kafka offset to commit. +type noopCommitter struct{} + +func (noopCommitter) Commit(_ context.Context, _ int32, _ int64) error { return nil } + +// noopMetastoreEventEmitter is a metastoreEventEmitter that discards events. +// Used in inmemory mode where there is no Kafka metastore topic. +type noopMetastoreEventEmitter struct{} + +func (noopMetastoreEventEmitter) Emit(_ context.Context, _ string, _ time.Time) error { return nil } + +// tocFlusher wraps a flusher and builds an inline index after each log object flush. +// Used in inmemory mode so that flushed objects are queryable via ObjectMetastore +// without a Kafka-based index builder. +type tocFlusher struct { + inner flusher + logBucket objstore.Bucket // raw bucket for reading uploaded log objects + logTocWriter *metastore.TableOfContentsWriter // writes log-object ToC (for DataObjects / label queries) + indexCalculator *dataobjindex.Calculator // builds index from each log object + indexUploader uploader // uploads index objects to the index-prefixed bucket + indexTocWriter *metastore.TableOfContentsWriter // writes index-object ToC (for Sections queries) + logger log.Logger +} + +func (f *tocFlusher) Flush(ctx context.Context, b builder, reason string) (string, error) { + level.Info(f.logger).Log("msg", "tocFlusher.Flush called", "reason", reason) + + // Capture time ranges BEFORE the inner flush resets the builder. + logTimeRanges := b.TimeRanges() + + objectPath, err := f.inner.Flush(ctx, b, reason) + if err != nil { + return "", err + } + + // Write the log-object ToC entry so DataObjects() can discover it. + if err := f.logTocWriter.WriteEntry(ctx, objectPath, logTimeRanges); err != nil { + return objectPath, fmt.Errorf("failed to write log ToC entry: %w", err) + } + + // Build an inline index from the uploaded log object so that Sections() + // queries work without a separate Kafka-based index builder. + logObj, err := dataobj.FromBucket(ctx, f.logBucket, objectPath, 0) + if err != nil { + return objectPath, fmt.Errorf("failed to read log object for indexing %s: %w", objectPath, err) + } + + f.indexCalculator.Reset() + if err := f.indexCalculator.Calculate(ctx, f.logger, logObj, objectPath); err != nil { + return objectPath, fmt.Errorf("failed to calculate index for %s: %w", objectPath, err) + } + + // Capture index time ranges before Flush resets the calculator. + indexTimeRanges := f.indexCalculator.TimeRanges() + + indexObj, indexCloser, err := f.indexCalculator.Flush() + if err != nil { + return objectPath, fmt.Errorf("failed to flush index for %s: %w", objectPath, err) + } + defer indexCloser.Close() + + indexPath, err := f.indexUploader.Upload(ctx, indexObj) + if err != nil { + return objectPath, fmt.Errorf("failed to upload index for %s: %w", objectPath, err) + } + + if err := f.indexTocWriter.WriteEntry(ctx, indexPath, indexTimeRanges); err != nil { + return objectPath, fmt.Errorf("failed to write index ToC entry for %s: %w", objectPath, err) + } + + return objectPath, nil +} + +// RecordsChannel returns the in-process channel that callers use to submit +// records in inmemory mode. It is nil in Kafka mode. +func (s *Service) RecordsChannel() chan *kgo.Record { + return s.recordsChan +} + +// CheckReady returns nil if the service is running and ready to handle +// requests. Used by the readiness handler in loki.go to gate /ready. +func (s *Service) CheckReady(_ context.Context) error { + if s.State() != services.Running { + return fmt.Errorf("dataobj consumer is not running (state: %s)", s.State()) + } + return nil +} + +// NewInMemory creates a consumer Service that receives records via an in-process +// buffered channel instead of from Kafka. cfg.IngestMode must be IngestModeInMemory. +// mCfg is used to determine the index storage prefix so that flushed objects are +// queryable via ObjectMetastore. +func NewInMemory(cfg Config, mCfg metastore.Config, bucket objstore.Bucket, scratchStore scratch.Store, reg prometheus.Registerer, logger log.Logger) (*Service, error) { + if cfg.IngestMode != IngestModeInMemory { + return nil, fmt.Errorf("NewInMemory requires IngestMode=%q, got %q", IngestModeInMemory, cfg.IngestMode) + } + logger = log.With(logger, "component", "dataobj-consumer-inmemory") + + const partitionID = int32(0) + recordsChan := make(chan *kgo.Record, cfg.ChannelSize) + + s := &Service{ + cfg: cfg, + logger: logger, + reg: reg, + partition: partitionID, + recordsChan: recordsChan, + } + + uploader := dataobj_uploader.New(cfg.UploaderConfig, bucket, logger) + if err := uploader.RegisterMetrics(reg); err != nil { + level.Error(logger).Log("msg", "failed to register uploader metrics", "err", err) + } + builderFactory := logsobj.NewBuilderFactory(cfg.BuilderConfig, scratchStore) + sorter := logsobj.NewSorter(builderFactory, reg) + s.flusher = newFlusher(sorter, uploader, logger, reg) + + // In inmemory mode there is no Kafka metastore consumer or index builder, + // so we build a per-flush inline index and write both log ToC and index ToC + // entries directly, making flushed objects queryable via ObjectMetastore. + logTocWriter := metastore.NewTableOfContentsWriter(bucket, logger) + + // The index bucket uses the same prefix as ObjectMetastore so that + // GetIndexes() finds the index objects we upload. + indexBucket := objstore.NewPrefixedBucket(bucket, mCfg.IndexStoragePrefix) + indexUploader := dataobj_uploader.New(cfg.UploaderConfig, indexBucket, logger) + indexTocWriter := metastore.NewTableOfContentsWriter(indexBucket, logger) + + // Pre-create the tocs directory by uploading a zero-byte sentinel file. + // The filesystem bucket's GetAndReplace (used by TableOfContentsWriter) does NOT + // create parent directories, but Upload does. This is a no-op on cloud providers. + if err := indexBucket.Upload(context.Background(), "tocs/.init", bytes.NewReader(nil)); err != nil { + level.Debug(logger).Log("msg", "failed to pre-create index tocs directory", "err", err) + } + if err := bucket.Upload(context.Background(), "tocs/.init", bytes.NewReader(nil)); err != nil { + level.Debug(logger).Log("msg", "failed to pre-create log tocs directory", "err", err) + } + + // indexobj.Builder config: use the same base config as the log builder. + indexBuilder, err := indexobj.NewBuilder(cfg.BuilderBaseConfig, scratchStore) + if err != nil { + return nil, fmt.Errorf("failed to create index builder: %w", err) + } + + wrappedFlusher := &tocFlusher{ + inner: s.flusher, + logBucket: bucket, + logTocWriter: logTocWriter, + indexCalculator: dataobjindex.NewCalculator(indexBuilder), + indexUploader: indexUploader, + indexTocWriter: indexTocWriter, + logger: logger, + } + + wrapped := prometheus.WrapRegistererWith(prometheus.Labels{ + "partition": strconv.Itoa(int(partitionID)), + }, reg) + builder, err := builderFactory.NewBuilder(wrapped) + if err != nil { + return nil, fmt.Errorf("failed to initialize data object builder: %w", err) + } + + flushCommitter := newFlushCommitter( + wrappedFlusher, + noopMetastoreEventEmitter{}, + noopCommitter{}, + partitionID, + logger, + wrapped, + ) + s.processor = newProcessor( + builder, + recordsChan, + flushCommitter, + cfg.IdleFlushTimeout, + cfg.MaxBuilderAge, + logger, + wrapped, + ) + + s.Service = services.NewBasicService(s.inMemoryStarting, s.running, s.inMemoryStopping) + return s, nil +} + +func (s *Service) inMemoryStarting(ctx context.Context) error { + level.Info(s.logger).Log("msg", "starting inmemory dataobj consumer") + if err := services.StartAndAwaitRunning(ctx, s.processor); err != nil { + return fmt.Errorf("failed to start partition processor: %w", err) + } + return nil +} + +func (s *Service) inMemoryStopping(failureCase error) error { + level.Info(s.logger).Log("msg", "stopping inmemory dataobj consumer") + ctx := context.TODO() + if err := services.StopAndAwaitTerminated(ctx, s.processor); err != nil { + level.Warn(s.logger).Log("msg", "failed to stop partition processor", "err", err) + } + level.Info(s.logger).Log("msg", "stopped inmemory dataobj consumer") + return failureCase } func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objstore.Bucket, scratchStore scratch.Store, _ string, _ ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) (*Service, error) { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 56972f7e07..949bae0864 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -113,6 +113,8 @@ type Config struct { KafkaConfig kafka.Config `yaml:"-"` DataObjTeeConfig DataObjTeeConfig `yaml:"dataobj_tee"` + + InMemoryPushTimeout time.Duration `yaml:"inmemory_dataobj_push_timeout"` } // RegisterFlags registers distributor-related flags. @@ -129,12 +131,11 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.") fs.BoolVar(&cfg.IngestLimitsEnabled, "distributor.ingest-limits-enabled", false, "Enable checking limits against the ingest-limits service. Defaults to false.") fs.BoolVar(&cfg.IngestLimitsDryRunEnabled, "distributor.ingest-limits-dry-run-enabled", false, "Enable dry-run mode where limits are checked the ingest-limits service, but not enforced. Defaults to false.") + fs.DurationVar(&cfg.InMemoryPushTimeout, "distributor.inmemory-dataobj-push-timeout", 5*time.Second, + "Timeout for sending a record to the in-memory queue before returning backpressure to the caller. Defaults to 5s. Set to 0 for no timeout.") } func (cfg *Config) Validate() error { - if !cfg.KafkaEnabled && !cfg.IngesterEnabled { - return fmt.Errorf("at least one of kafka and ingestor writes must be enabled") - } if err := cfg.DataObjTeeConfig.Validate(); err != nil { return err } @@ -142,6 +143,9 @@ func (cfg *Config) Validate() error { if cfg.MaxDecompressedSize == 0 && cfg.MaxRecvMsgSize > 0 { cfg.MaxDecompressedSize = int64(cfg.MaxRecvMsgSize) * 50 } + if cfg.InMemoryPushTimeout < 0 { + return errors.New("distributor.inmemory-dataobj-push-timeout must be >= 0") + } return nil } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index e0731d5f30..4892f43518 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2801,12 +2801,13 @@ func TestConfig_Validate(t *testing.T) { expectedMaxDecompressedSize: 0, // Should remain 0 }, { - name: "validates kafka and ingester enabled", + // kafka=false + ingester=false is now allowed: in inmemory mode the tee is + // wired programmatically and does not require either flag. + name: "kafka=false ingester=false is valid (inmemory mode uses programmatic tee)", cfg: Config{ KafkaEnabled: false, IngesterEnabled: false, }, - expectedError: "at least one of kafka and ingestor writes must be enabled", }, } diff --git a/pkg/distributor/inmemory_dataobj_tee.go b/pkg/distributor/inmemory_dataobj_tee.go new file mode 100644 index 0000000000..2e849ea40a --- /dev/null +++ b/pkg/distributor/inmemory_dataobj_tee.go @@ -0,0 +1,117 @@ +package distributor + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/kafka" +) + +const ( + inmemoryTeeDefaultTopic = "loki" + inmemoryTeeDefaultMaxBufferedBytes = 100 << 20 // 100 MB +) + +// InMemoryDataObjTee implements the [Tee] interface and sends encoded log +// records to an in-process channel instead of to Kafka. It is used in +// single-binary mode with ingest_mode=inmemory. +type InMemoryDataObjTee struct { + records chan *kgo.Record + topic string + maxBufferedBytes int + pushTimeout time.Duration + logger log.Logger + + streams prometheus.Counter + streamFailures *prometheus.CounterVec +} + +// NewInMemoryDataObjTee returns a new InMemoryDataObjTee that sends encoded +// records to the given channel. reg and logger may be nil. +func NewInMemoryDataObjTee(records chan *kgo.Record, reg prometheus.Registerer, logger log.Logger, pushTimeout time.Duration) *InMemoryDataObjTee { + if logger == nil { + logger = log.NewNopLogger() + } + return &InMemoryDataObjTee{ + records: records, + topic: inmemoryTeeDefaultTopic, + maxBufferedBytes: inmemoryTeeDefaultMaxBufferedBytes, + pushTimeout: pushTimeout, + logger: logger, + streams: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "loki_distributor_inmemory_dataobj_tee_streams_total", + Help: "Total number of streams duplicated (both successful and failed) to the in-memory dataobj channel.", + }), + streamFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_distributor_inmemory_dataobj_tee_stream_failures_total", + Help: "Total number of streams that could not be duplicated to the in-memory dataobj channel.", + }, []string{"reason"}), + } +} + +// Register implements [Tee]. It adds all streams to the pushTracker's pending count +// so the distributor waits for them before concluding a push request. +func (t *InMemoryDataObjTee) Register(_ context.Context, _ string, streams []KeyedStream, pushTracker *PushTracker) { + pushTracker.streamsPending.Add(int32(len(streams))) +} + +// Duplicate implements [Tee]. It encodes each stream and sends it to the +// in-process channel, calling pushTracker.doneWithResult for each stream. +func (t *InMemoryDataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker) { + go func() { + for _, s := range streams { + t.duplicate(ctx, tenant, s, pushTracker) + } + }() +} + +func (t *InMemoryDataObjTee) duplicate(ctx context.Context, tenant string, stream KeyedStream, pushTracker *PushTracker) { + t.streams.Inc() + + records, err := kafka.EncodeWithTopic(t.topic, 0, tenant, stream.Stream, t.maxBufferedBytes) + if err != nil { + level.Error(t.logger).Log("msg", "failed to encode stream for in-memory tee", "err", err) + t.streamFailures.WithLabelValues("encode_error").Inc() + pushTracker.doneWithResult(fmt.Errorf("couldn't process request internally due to inmemory tee error: %d", TeeCouldntEncodeStreamError)) + return + } + + // Single timer for the whole stream batch. Using time.NewTimer + defer Stop + // avoids the leak caused by time.After inside a loop (each call creates a timer + // that lives until it fires, even if the select chose a different case). + // A nil channel blocks forever, so timeout stays nil when pushTimeout == 0. + // + // Note: if the channel send times out mid-batch, earlier records from this + // stream are already queued. The consumer will process them as a partial + // stream. This is acceptable in inmemory mode (no durability guarantees). + var timeout <-chan time.Time + if t.pushTimeout > 0 { + timer := time.NewTimer(t.pushTimeout) + defer timer.Stop() + timeout = timer.C + } + + for _, rec := range records { + select { + case t.records <- rec: + case <-timeout: + level.Error(t.logger).Log("msg", "in-memory dataobj tee channel full, dropping record", "tenant", tenant) + t.streamFailures.WithLabelValues("channel_full").Inc() + pushTracker.doneWithResult(fmt.Errorf("couldn't process request internally due to inmemory tee error: %d", TeeCouldntProduceRecordsError)) + return + case <-ctx.Done(): + t.streamFailures.WithLabelValues("cancellation").Inc() + pushTracker.doneWithResult(ctx.Err()) + return + } + } + + pushTracker.doneWithResult(nil) +} diff --git a/pkg/distributor/inmemory_dataobj_tee_test.go b/pkg/distributor/inmemory_dataobj_tee_test.go new file mode 100644 index 0000000000..08d9790040 --- /dev/null +++ b/pkg/distributor/inmemory_dataobj_tee_test.go @@ -0,0 +1,71 @@ +package distributor + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/logproto" +) + +func newTestInMemoryTee(capacity int) (*InMemoryDataObjTee, chan *kgo.Record) { + ch := make(chan *kgo.Record, capacity) + tee := NewInMemoryDataObjTee(ch, prometheus.NewRegistry(), nil, 5*time.Second) + return tee, ch +} + +func TestInMemoryDataObjTee_Register_AddsPending(t *testing.T) { + tee, _ := newTestInMemoryTee(10) + ctx := context.Background() + + streams := []KeyedStream{ + {Stream: logproto.Stream{Labels: `{a="b"}`}}, + {Stream: logproto.Stream{Labels: `{c="d"}`}}, + {Stream: logproto.Stream{Labels: `{e="f"}`}}, + } + pushTracker := &PushTracker{ + done: make(chan struct{}, 1), + err: make(chan error, 1), + } + + tee.Register(ctx, "tenant", streams, pushTracker) + assert.EqualValues(t, 3, pushTracker.streamsPending.Load()) +} + +func TestInMemoryDataObjTee_Duplicate_SendsRecords(t *testing.T) { + tee, ch := newTestInMemoryTee(100) + ctx := context.Background() + tenant := "test-tenant" + now := time.Now() + + streams := []KeyedStream{ + { + Stream: logproto.Stream{ + Labels: `{job="test"}`, + Entries: []logproto.Entry{ + {Timestamp: now, Line: "line1"}, + {Timestamp: now.Add(time.Second), Line: "line2"}, + }, + }, + }, + } + pushTracker := &PushTracker{ + done: make(chan struct{}, 1), + err: make(chan error, 1), + } + pushTracker.streamsPending.Store(1) + + tee.Duplicate(ctx, tenant, streams, pushTracker) + + // Wait for at least one record to arrive. + select { + case rec := <-ch: + assert.NotNil(t, rec) + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for record") + } +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 76731836a8..9a6ebeef56 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -693,7 +693,9 @@ func (t *Loki) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool // Ingester has a special check that makes sure that it was able to register into the ring, // and that all other ring entries are OK too. - if t.Ingester != nil { + // In inmemory dataobj mode the write path bypasses the ingester entirely, so skip this gate. + inMemoryDataObjMode := t.Cfg.DataObj.Enabled && t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory + if t.Ingester != nil && !inMemoryDataObjMode { if err := t.Ingester.CheckReady(r.Context()); err != nil { http.Error(w, fmt.Sprintf("Ingester not ready: %s", err), http.StatusServiceUnavailable) return @@ -735,6 +737,13 @@ func (t *Loki) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool } } + if t.dataObjConsumer != nil { + if err := t.dataObjConsumer.CheckReady(r.Context()); err != nil { + http.Error(w, fmt.Sprintf("DataObj Consumer not ready: %s", err), http.StatusServiceUnavailable) + return + } + } + http.Error(w, "ready", http.StatusOK) } } @@ -860,6 +869,14 @@ func (t *Loki) setupModuleManager() error { deps[All] = append(deps[All], IngestLimits, IngestLimitsFrontend) } + if t.Cfg.DataObj.Enabled { + deps[All] = append(deps[All], DataObjConsumer, DataObjIndexBuilder) + if t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory { + // DataObjConsumer must be initialized before Distributor so that + deps[Distributor] = append(deps[Distributor], DataObjConsumer) + } + } + if t.Cfg.Querier.PerRequestLimitsEnabled { level.Debug(util_log.Logger).Log("msg", "per-query request limits support enabled") mm.RegisterModule(QueryLimiter, t.initQueryLimiter, modules.UserInvisibleModule) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 23c65723cb..1ddd9c22c7 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -355,6 +355,15 @@ func (t *Loki) initDistributor() (services.Service, error) { return nil, errors.New("kafka is enabled in distributor but not in ingester") } + // In inmemory mode, wire the in-process tee before creating the distributor. + // DataObjTeeConfig.Enabled stays false so distributor.New() creates no internal Kafka tee. + if t.Cfg.DataObj.Enabled && t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory { + reg := prometheus.DefaultRegisterer + logger := log.With(util_log.Logger, "component", "inmemory-dataobj-tee") + inmemTee := distributor.NewInMemoryDataObjTee(t.dataObjConsumer.RecordsChannel(), reg, logger, t.Cfg.Distributor.InMemoryPushTimeout) + t.Tee = distributor.WrapTee(t.Tee, inmemTee) + } + // Add ingestion policy interceptors to ingester client t.Cfg.IngesterClient.GRPCUnaryClientInterceptors = append( t.Cfg.IngesterClient.GRPCUnaryClientInterceptors, @@ -2379,6 +2388,34 @@ func (t *Loki) initDataObjConsumer() (services.Service, error) { return nil, err } + if t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory { + level.Warn(util_log.Logger).Log("msg", "inmemory ingest mode is experimental — no durability guarantees, single-replica only; each replica holds independent data") + level.Info(util_log.Logger).Log("msg", "initializing inmemory dataobj consumer") + svc, err := consumer.NewInMemory( + t.Cfg.DataObj.Consumer, + t.Cfg.DataObj.Metastore, + store, + t.scratchStore, + prometheus.DefaultRegisterer, + util_log.Logger, + ) + if err != nil { + return nil, err + } + t.dataObjConsumer = svc + + // Register the flush endpoint for inmemory mode (testing/operational use). + httpMiddleware := middleware.Merge( + serverutil.RecoveryHTTPMiddleware, + ) + t.Server.HTTP. + Methods(http.MethodPost). + Path("/dataobj-consumer/flush"). + Handler(httpMiddleware.Wrap(http.HandlerFunc(t.dataObjConsumer.FlushHandler))) + + return svc, nil + } + t.Cfg.DataObj.Consumer.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort level.Info(util_log.Logger).Log("msg", "initializing dataobj consumer", "instance", t.Cfg.Ingester.LifecyclerConfig.ID) @@ -2417,6 +2454,10 @@ func (t *Loki) initDataObjIndexBuilder() (services.Service, error) { if !t.Cfg.DataObj.Enabled { return nil, nil } + if t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory { + level.Info(util_log.Logger).Log("msg", "skipping dataobj index builder in inmemory mode; label queries will use full dataobj scan") + return nil, nil + } store, err := t.getDataObjBucket("dataobj-index-builder") if err != nil { return nil, err