chore: single binary without Kafka (#21318)

Signed-off-by: Shantanu Alshi <shantanu.alshi@grafana.com>
Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
pull/18598/merge
Shantanu Alshi 2 months ago committed by GitHub
parent 904c963d17
commit 368583e778
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 117
      cmd/loki/dataobj-inmemory-config.yaml
  2. 15
      docs/sources/shared/configuration.md
  3. 19
      integration/client/client.go
  4. 111
      integration/loki_dataobj_inmemory_pipeline_test.go
  5. 42
      pkg/dataobj/consumer/config.go
  6. 16
      pkg/dataobj/consumer/http.go
  7. 50
      pkg/dataobj/consumer/mock_test.go
  8. 95
      pkg/dataobj/consumer/processor.go
  9. 58
      pkg/dataobj/consumer/processor_drain_test.go
  10. 211
      pkg/dataobj/consumer/service.go
  11. 10
      pkg/distributor/distributor.go
  12. 5
      pkg/distributor/distributor_test.go
  13. 117
      pkg/distributor/inmemory_dataobj_tee.go
  14. 71
      pkg/distributor/inmemory_dataobj_tee_test.go
  15. 19
      pkg/loki/loki.go
  16. 41
      pkg/loki/modules.go

@ -0,0 +1,117 @@
# Single-binary Loki with dataobjects, no Kafka.
# Writes go to the in-process stream sink -> dataobj consumer -> object storage.
# No ingester/chunk writes; query via v2 engine (dataobjects).
auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: info
grpc_server_max_concurrent_streams: 1000
common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
scratch_path: /tmp/loki/scratch
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory
# Dataobj: in-memory ingest (no Kafka)
dataobj:
enabled: true
storage_bucket_prefix: "dataobj/"
consumer:
ingest_mode: inmemory
idle_flush_timeout: 2m
max_builder_age: 5m
# Optional: inmemory_channel_capacity: 10000, inmemory_push_timeout: 5s
metastore:
index_storage_prefix: "index/v0"
partition_ratio: 10
# No Kafka
# kafka_config is omitted; not used when ingest_mode=inmemory
# Querier
querier:
query_partition_ingesters: false
# V2 query engine: use dataobjects for all data (storage_lag=0 so no cutoff to chunks)
query_engine:
enable: true
enable_engine_router: false
storage_lag: 0
# Ingest limits off (they use Kafka)
ingest_limits:
enabled: false
ingest_limits_frontend:
num_partitions: 1
lifecycler:
ring:
kvstore:
store: inmemory
replication_factor: 1
# Ingester not used for writes (distributor sends only to dataobj stream sink)
ingester:
wal:
enabled: false
kafka_ingestion:
enabled: false
lifecycler:
id: "loki.local"
# Distributor: only dataobj in-memory sink (no Kafka, no ingester)
distributor:
kafka_writes_enabled: false
ingester_writes_enabled: false
ingest_limits_enabled: false
ingest_limits_dry_run_enabled: false
# Query range
query_range:
cache_volume_results: false
results_cache:
cache:
embedded_cache:
enabled: false
max_size_mb: 100
limits_config:
metric_aggregation_enabled: true
max_global_streams_per_user: 1000
# Schema: TSDB + filesystem object store (dataobj uses this for objects/index)
schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
storage_config:
object_store:
filesystem:
dir: /tmp/loki
# Disabled to avoid internal push load while testing single-binary dataobj
pattern_ingester:
enabled: false
ruler:
alertmanager_url: http://localhost:9093
frontend:
encoding: protobuf

@ -1522,6 +1522,16 @@ dataobj:
# CLI flag: -dataobj-consumer.max-builder-age
[max_builder_age: <duration> | default = 1h]
# How records are ingested: "kafka" reads from a Kafka topic; "inmemory"
# uses an in-process channel (experimental, single-node, no durability
# guarantees, each replica holds independent data).
# CLI flag: -dataobj-consumer.ingest-mode
[ingest_mode: <string> | default = "kafka"]
# Internal buffer size for records for inmemory ingestion.
# CLI flag: -dataobj-consumer.channel-size
[channel_size: <int> | default = 10000]
# The name of the Kafka topic.
# CLI flag: -dataobj-consumer.topic
[topic: <string> | default = ""]
@ -3336,6 +3346,11 @@ dataobj_tee:
# to 0 to disable batching.
# CLI flag: -distributor.dataobj-tee.rate-batch-window
[rate_batch_window: <duration> | default = 0s]
# Timeout for sending a record to the in-memory queue before returning
# backpressure to the caller. Defaults to 5s. Set to 0 for no timeout.
# CLI flag: -distributor.inmemory-dataobj-push-timeout
[inmemory_dataobj_push_timeout: <duration> | default = 5s]
```
### etcd

@ -254,6 +254,25 @@ func (c *Client) Metrics() (string, error) {
return sb.String(), nil
}
// FlushDataobj triggers a flush of the in-memory dataobj consumer.
func (c *Client) FlushDataobj() error {
req, err := c.request(context.Background(), "POST", fmt.Sprintf("%s/dataobj-consumer/flush", c.baseURL))
if err != nil {
return err
}
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode/100 == 2 {
return nil
}
return fmt.Errorf("request failed with status code %d", res.StatusCode)
}
// Flush all in-memory chunks held by the ingesters to the backing store
func (c *Client) Flush() error {
req, err := c.request(context.Background(), "POST", fmt.Sprintf("%s/flush", c.baseURL))

@ -0,0 +1,111 @@
//go:build integration
package integration
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/integration/client"
"github.com/grafana/loki/v3/integration/cluster"
)
func TestInmemoryPipeline(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()
tAll := clu.AddComponent(
"all",
"-target=all",
"-dataobj.enabled=true",
"-dataobj-consumer.ingest-mode=inmemory",
"-dataobj-consumer.target-page-size=2KB",
"-dataobj-consumer.target-builder-memory-limit=1MB",
"-dataobj-consumer.buffer-size=256KB",
"-dataobj-consumer.target-section-size=512KB",
"-dataobj-consumer.section-stripe-merge-limit=2",
"-dataobj-consumer.sha-prefix-size=2",
"-dataobj-consumer.idle-flush-timeout=100ms",
"-distributor.ingester-writes-enabled=false",
"-pattern-ingester.enabled=false",
"-query-engine.enable=true",
"-query-engine.storage-lag=0",
)
scratchPath := tAll.ClusterSharedPath() + "/scratch"
require.NoError(t, os.MkdirAll(scratchPath, 0o755))
tAll.WithExtraConfig(fmt.Sprintf(`
common:
scratch_path: %s
ring:
kvstore:
store: inmemory
ingest_limits:
enabled: false
`, scratchPath))
require.NoError(t, clu.Run())
tenantID := randStringRunes()
cli := client.New(tenantID, "", tAll.HTTPURL())
cli.Now = time.Now().Add(-30 * time.Second)
t.Run("round-trip", func(t *testing.T) {
require.NoError(t, cli.PushLogLine("pipeline-line-1", cli.Now, nil, map[string]string{"job": "pipeline-test"}))
require.NoError(t, cli.PushLogLine("pipeline-line-2", cli.Now.Add(-time.Second), nil, map[string]string{"job": "pipeline-test"}))
require.NoError(t, cli.FlushDataobj())
var lines []string
require.Eventually(t, func() bool {
qresp, err := cli.RunRangeQuery(context.Background(), `{job="pipeline-test"}`)
if err != nil {
return false
}
lines = nil
for _, stream := range qresp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
return len(lines) >= 2
}, 30*time.Second, 100*time.Millisecond, "pushed logs never appeared in query results")
assert.ElementsMatch(t, []string{"pipeline-line-1", "pipeline-line-2"}, lines)
})
t.Run("idle-flush-timeout", func(t *testing.T) {
tenantIdle := randStringRunes()
cliIdle := client.New(tenantIdle, "", tAll.HTTPURL())
cliIdle.Now = time.Now()
require.NoError(t, cliIdle.PushLogLine("idle-flush-line", cliIdle.Now, nil, map[string]string{"job": "idle-test"}))
// Do NOT call the flush endpoint — let the idle flush timeout (100ms) trigger naturally.
var lines []string
require.Eventually(t, func() bool {
qresp, err := cliIdle.RunRangeQuery(context.Background(), `{job="idle-test"}`)
if err != nil {
return false
}
lines = nil
for _, stream := range qresp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
return len(lines) >= 1
}, 15*time.Second, 200*time.Millisecond, "idle flush did not trigger within timeout")
assert.Contains(t, lines, "idle-flush-line")
})
}

@ -3,6 +3,7 @@ package consumer
import (
"errors"
"flag"
"fmt"
"time"
"github.com/grafana/dskit/ring"
@ -13,6 +14,16 @@ import (
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
// IngestMode determines how the consumer receives records.
type IngestMode string
const (
// IngestModeKafka reads records from a Kafka topic (default).
IngestModeKafka IngestMode = "kafka"
// IngestModeInMemory receives records via an in-process Go channel (no Kafka required).
IngestModeInMemory IngestMode = "inmemory"
)
type Config struct {
logsobj.BuilderConfig
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
@ -21,6 +32,14 @@ type Config struct {
IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"`
MaxBuilderAge time.Duration `yaml:"max_builder_age"`
// IngestMode controls how records are ingested. "kafka" (default) reads from
// a Kafka topic; "inmemory" uses an in-process channel (no Kafka needed).
IngestMode IngestMode `yaml:"ingest_mode"`
// ChannelSize is the capacity of the buffered channel used to pass records
// from the distributor to the consumer in inmemory mode.
ChannelSize int `yaml:"channel_size"`
// This is temporary until we move to kafkav2.
Topic string `yaml:"topic"`
}
@ -35,8 +54,15 @@ func (cfg *Config) Validate() error {
if err := cfg.UploaderConfig.Validate(); err != nil {
return err
}
if cfg.Topic == "" {
return errors.New("topic is required")
switch cfg.IngestMode {
case IngestModeKafka, "": // empty defaults to kafka
if cfg.Topic == "" {
return errors.New("topic is required")
}
case IngestModeInMemory:
// topic not required in inmemory mode
default:
return fmt.Errorf("unknown ingest_mode %q: must be %q or %q", cfg.IngestMode, IngestModeKafka, IngestModeInMemory)
}
return nil
}
@ -50,6 +76,18 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix, f, util_log.Logger)
cfg.PartitionRingConfig.RegisterFlagsWithPrefix(prefix, f)
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)
f.StringVar(
(*string)(&cfg.IngestMode),
prefix+"ingest-mode",
string(IngestModeKafka),
`How records are ingested: "kafka" reads from a Kafka topic; "inmemory" uses an in-process channel (experimental, single-node, no durability guarantees, each replica holds independent data).`,
)
f.IntVar(
&cfg.ChannelSize,
prefix+"channel-size",
10000,
`Internal buffer size for records for inmemory ingestion.`,
)
f.StringVar(
&cfg.Topic,
prefix+"topic",

@ -100,6 +100,22 @@ func (s *Service) cancelDelayedDownscale(w http.ResponseWriter, r *http.Request)
s.respondWithCurrentPartitionState(w, r)
}
// FlushHandler triggers an immediate flush of any in-flight builders.
// Used in testing and operational tooling to force data into object storage
// without waiting for idle or max-age timeouts.
func (s *Service) FlushHandler(w http.ResponseWriter, r *http.Request) {
if s.processor == nil || s.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
if err := s.processor.Flush(r.Context()); err != nil {
level.Error(s.logger).Log("msg", "flush failed", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (s *Service) respondWithCurrentPartitionState(w http.ResponseWriter, r *http.Request) {
state, stateTimestamp, err := s.partitionInstanceLifecycler.GetPartitionState(r.Context())
if err != nil {

@ -50,8 +50,21 @@ func (m *mockBucket) Get(_ context.Context, name string) (io.ReadCloser, error)
return io.NopCloser(bytes.NewReader(data)), nil
}
func (m *mockBucket) GetRange(_ context.Context, _ string, _, _ int64) (io.ReadCloser, error) {
return nil, nil
func (m *mockBucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
m.mu.Lock()
defer m.mu.Unlock()
data, exists := m.uploads[name]
if !exists {
return nil, errors.New("object not found")
}
if off < 0 || off > int64(len(data)) {
return nil, errors.New("offset out of range")
}
end := off + length
if end > int64(len(data)) {
end = int64(len(data))
}
return io.NopCloser(bytes.NewReader(data[off:end])), nil
}
func (m *mockBucket) Upload(_ context.Context, name string, r io.Reader) error {
@ -69,12 +82,37 @@ func (m *mockBucket) Iter(_ context.Context, _ string, _ func(string) error, _ .
return nil
}
func (m *mockBucket) Name() string { return "mock" }
func (m *mockBucket) Attributes(_ context.Context, _ string) (objstore.ObjectAttributes, error) {
return objstore.ObjectAttributes{}, nil
func (m *mockBucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) {
m.mu.Lock()
defer m.mu.Unlock()
data, exists := m.uploads[name]
if !exists {
return objstore.ObjectAttributes{}, errors.New("object not found")
}
return objstore.ObjectAttributes{Size: int64(len(data))}, nil
}
func (m *mockBucket) GetAndReplace(_ context.Context, name string, _ func(io.ReadCloser) (io.ReadCloser, error)) error {
return m.Upload(context.Background(), name, io.NopCloser(bytes.NewReader([]byte{})))
func (m *mockBucket) GetAndReplace(_ context.Context, name string, fn func(io.ReadCloser) (io.ReadCloser, error)) error {
m.mu.Lock()
var existing io.ReadCloser
if data, ok := m.uploads[name]; ok {
existing = io.NopCloser(bytes.NewReader(data))
}
m.mu.Unlock()
newRC, err := fn(existing)
if err != nil {
return err
}
defer newRC.Close()
data, err := io.ReadAll(newRC)
if err != nil {
return err
}
m.mu.Lock()
m.uploads[name] = data
m.mu.Unlock()
return nil
}
func (m *mockBucket) IsAccessDeniedErr(_ error) bool {

@ -35,12 +35,19 @@ type flushCommitter interface {
}
// A processor receives records and builds data objects from them.
// flushRequest is used to send a flush request to the processor's Run loop.
type flushRequest struct {
done chan<- error
}
type processor struct {
*services.BasicService
builder builder
decoder *kafka.Decoder
records chan *kgo.Record
flushCommitter flushCommitter
// flushRequests is used to safely trigger a flush from outside the Run loop.
flushRequests chan flushRequest
// lastOffset contains the offset of the last record appended to the data object
// builder. It is used to commit the correct offset after a flush.
@ -99,6 +106,7 @@ func newProcessor(
decoder: decoder,
records: records,
flushCommitter: flushCommitter,
flushRequests: make(chan flushRequest, 1),
idleFlushTimeout: idleFlushTimeout,
maxBuilderAge: maxBuilderAge,
metrics: newMetrics(reg),
@ -119,8 +127,53 @@ func (p *processor) running(ctx context.Context) error {
return p.Run(ctx)
}
// stopping implements [services.StoppingFn].
// stopping implements [services.StoppingFn]. It drains any buffered records
// from the in-process channel (up to a 30s timeout) before returning, then
// flushes any accumulated data. This ensures that records buffered at SIGTERM
// are not silently lost in inmemory mode.
//
// Note: stopping() is called after Run() returns (dskit guarantees
// RunningFn happens-before StoppingFn), so there is no race with the run loop.
// The records channel remains open (owned by Service) and may still have
// buffered records written by the distributor before the push timeout fired.
func (p *processor) stopping(_ error) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
dropped := 0
drain:
for {
select {
case rec, ok := <-p.records:
if !ok {
break drain
}
if err := p.processRecord(ctx, rec); err != nil {
level.Error(p.logger).Log("msg", "failed to process record during shutdown drain", "err", err)
p.observeRecordErr(rec)
}
case <-ctx.Done():
// Drain timed out — count remaining buffered records as dropped.
dropped = len(p.records)
break drain
default:
// Channel is empty — drain complete.
break drain
}
}
if dropped > 0 {
level.Warn(p.logger).Log("msg", "inmemory drain timed out, records dropped", "count", dropped)
} else {
level.Info(p.logger).Log("msg", "inmemory channel drained cleanly on shutdown")
}
// Flush whatever was accumulated during drain.
if !p.lastAppend.IsZero() && p.builder.GetEstimatedSize() > 0 {
if err := p.flush(ctx, "shutdown"); err != nil {
level.Error(p.logger).Log("msg", "failed to flush during shutdown drain", "err", err)
}
}
return nil
}
@ -149,6 +202,29 @@ func (p *processor) Run(ctx context.Context) error {
if _, err := p.idleFlush(ctx); err != nil {
level.Error(p.logger).Log("msg", "failed to idle flush", "err", err)
}
case req := <-p.flushRequests:
// Drain any records that are already in the channel before flushing
// so that all pending data is included in the flush.
drain:
for {
select {
case rec, ok := <-p.records:
if !ok {
break drain
}
if err := p.processRecord(ctx, rec); err != nil {
level.Error(p.logger).Log("msg", "failed to process record during flush drain", "err", err)
p.observeRecordErr(rec)
}
default:
break drain
}
}
var err error
if !p.lastAppend.IsZero() && p.builder.GetEstimatedSize() > 0 {
err = p.flush(ctx, flushReasonIdle)
}
req.done <- err
}
}
}
@ -222,6 +298,23 @@ func (p *processor) idleFlush(ctx context.Context) (bool, error) {
return true, nil
}
// Flush triggers an immediate flush via the processor's Run loop, draining any
// pending records from the channel first. Safe to call from any goroutine.
func (p *processor) Flush(ctx context.Context) error {
done := make(chan error, 1)
select {
case p.flushRequests <- flushRequest{done: done}:
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// needsIdleFlush returns true if the partition has exceeded the idle timeout
// and the builder has some data buffered.
func (p *processor) needsIdleFlush() bool {

@ -0,0 +1,58 @@
package consumer
import (
"bytes"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
)
// TestProcessor_Stopping_Drain verifies that processor.stopping() drains
// buffered records from the channel and logs the correct outcome.
func TestProcessor_Stopping_Drain(t *testing.T) {
t.Run("empty channel logs clean drain", func(t *testing.T) {
var buf bytes.Buffer
logger := log.NewLogfmtLogger(&buf)
reg := prometheus.NewRegistry()
builder := newTestBuilder(t, reg)
fc := &mockFlushCommitter{}
ch := make(chan *kgo.Record, 10)
proc := newProcessor(builder, ch, fc, time.Hour, time.Hour, logger, reg)
err := proc.stopping(nil)
require.NoError(t, err)
require.Contains(t, buf.String(), "inmemory channel drained cleanly on shutdown")
})
t.Run("records drained and logged clean", func(t *testing.T) {
var buf bytes.Buffer
logger := log.NewLogfmtLogger(&buf)
reg := prometheus.NewRegistry()
builder := newTestBuilder(t, reg)
fc := &mockFlushCommitter{}
ch := make(chan *kgo.Record, 10)
// Pre-fill 3 records into channel.
now := time.Now()
for i := 0; i < 3; i++ {
ch <- newTestRecord(t, "tenant1", now.Add(time.Duration(i)*time.Second))
}
proc := newProcessor(builder, ch, fc, time.Hour, time.Hour, logger, reg)
err := proc.stopping(nil)
require.NoError(t, err)
require.True(t,
strings.Contains(buf.String(), "inmemory channel drained cleanly on shutdown"),
"expected clean drain log, got: %s", buf.String(),
)
})
}

@ -1,6 +1,7 @@
package consumer
import (
"bytes"
"context"
"fmt"
"strconv"
@ -17,7 +18,10 @@ import (
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index"
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
dataobj_uploader "github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/kafka"
@ -49,6 +53,213 @@ type Service struct {
watcher *services.FailureWatcher
logger log.Logger
reg prometheus.Registerer
// recordsChan is set only in inmemory mode. Callers send *kgo.Record values
// here instead of through Kafka.
recordsChan chan *kgo.Record
}
// noopCommitter is a committer that does nothing. Used in inmemory mode where
// there is no Kafka offset to commit.
type noopCommitter struct{}
func (noopCommitter) Commit(_ context.Context, _ int32, _ int64) error { return nil }
// noopMetastoreEventEmitter is a metastoreEventEmitter that discards events.
// Used in inmemory mode where there is no Kafka metastore topic.
type noopMetastoreEventEmitter struct{}
func (noopMetastoreEventEmitter) Emit(_ context.Context, _ string, _ time.Time) error { return nil }
// tocFlusher wraps a flusher and builds an inline index after each log object flush.
// Used in inmemory mode so that flushed objects are queryable via ObjectMetastore
// without a Kafka-based index builder.
type tocFlusher struct {
inner flusher
logBucket objstore.Bucket // raw bucket for reading uploaded log objects
logTocWriter *metastore.TableOfContentsWriter // writes log-object ToC (for DataObjects / label queries)
indexCalculator *dataobjindex.Calculator // builds index from each log object
indexUploader uploader // uploads index objects to the index-prefixed bucket
indexTocWriter *metastore.TableOfContentsWriter // writes index-object ToC (for Sections queries)
logger log.Logger
}
func (f *tocFlusher) Flush(ctx context.Context, b builder, reason string) (string, error) {
level.Info(f.logger).Log("msg", "tocFlusher.Flush called", "reason", reason)
// Capture time ranges BEFORE the inner flush resets the builder.
logTimeRanges := b.TimeRanges()
objectPath, err := f.inner.Flush(ctx, b, reason)
if err != nil {
return "", err
}
// Write the log-object ToC entry so DataObjects() can discover it.
if err := f.logTocWriter.WriteEntry(ctx, objectPath, logTimeRanges); err != nil {
return objectPath, fmt.Errorf("failed to write log ToC entry: %w", err)
}
// Build an inline index from the uploaded log object so that Sections()
// queries work without a separate Kafka-based index builder.
logObj, err := dataobj.FromBucket(ctx, f.logBucket, objectPath, 0)
if err != nil {
return objectPath, fmt.Errorf("failed to read log object for indexing %s: %w", objectPath, err)
}
f.indexCalculator.Reset()
if err := f.indexCalculator.Calculate(ctx, f.logger, logObj, objectPath); err != nil {
return objectPath, fmt.Errorf("failed to calculate index for %s: %w", objectPath, err)
}
// Capture index time ranges before Flush resets the calculator.
indexTimeRanges := f.indexCalculator.TimeRanges()
indexObj, indexCloser, err := f.indexCalculator.Flush()
if err != nil {
return objectPath, fmt.Errorf("failed to flush index for %s: %w", objectPath, err)
}
defer indexCloser.Close()
indexPath, err := f.indexUploader.Upload(ctx, indexObj)
if err != nil {
return objectPath, fmt.Errorf("failed to upload index for %s: %w", objectPath, err)
}
if err := f.indexTocWriter.WriteEntry(ctx, indexPath, indexTimeRanges); err != nil {
return objectPath, fmt.Errorf("failed to write index ToC entry for %s: %w", objectPath, err)
}
return objectPath, nil
}
// RecordsChannel returns the in-process channel that callers use to submit
// records in inmemory mode. It is nil in Kafka mode.
func (s *Service) RecordsChannel() chan *kgo.Record {
return s.recordsChan
}
// CheckReady returns nil if the service is running and ready to handle
// requests. Used by the readiness handler in loki.go to gate /ready.
func (s *Service) CheckReady(_ context.Context) error {
if s.State() != services.Running {
return fmt.Errorf("dataobj consumer is not running (state: %s)", s.State())
}
return nil
}
// NewInMemory creates a consumer Service that receives records via an in-process
// buffered channel instead of from Kafka. cfg.IngestMode must be IngestModeInMemory.
// mCfg is used to determine the index storage prefix so that flushed objects are
// queryable via ObjectMetastore.
func NewInMemory(cfg Config, mCfg metastore.Config, bucket objstore.Bucket, scratchStore scratch.Store, reg prometheus.Registerer, logger log.Logger) (*Service, error) {
if cfg.IngestMode != IngestModeInMemory {
return nil, fmt.Errorf("NewInMemory requires IngestMode=%q, got %q", IngestModeInMemory, cfg.IngestMode)
}
logger = log.With(logger, "component", "dataobj-consumer-inmemory")
const partitionID = int32(0)
recordsChan := make(chan *kgo.Record, cfg.ChannelSize)
s := &Service{
cfg: cfg,
logger: logger,
reg: reg,
partition: partitionID,
recordsChan: recordsChan,
}
uploader := dataobj_uploader.New(cfg.UploaderConfig, bucket, logger)
if err := uploader.RegisterMetrics(reg); err != nil {
level.Error(logger).Log("msg", "failed to register uploader metrics", "err", err)
}
builderFactory := logsobj.NewBuilderFactory(cfg.BuilderConfig, scratchStore)
sorter := logsobj.NewSorter(builderFactory, reg)
s.flusher = newFlusher(sorter, uploader, logger, reg)
// In inmemory mode there is no Kafka metastore consumer or index builder,
// so we build a per-flush inline index and write both log ToC and index ToC
// entries directly, making flushed objects queryable via ObjectMetastore.
logTocWriter := metastore.NewTableOfContentsWriter(bucket, logger)
// The index bucket uses the same prefix as ObjectMetastore so that
// GetIndexes() finds the index objects we upload.
indexBucket := objstore.NewPrefixedBucket(bucket, mCfg.IndexStoragePrefix)
indexUploader := dataobj_uploader.New(cfg.UploaderConfig, indexBucket, logger)
indexTocWriter := metastore.NewTableOfContentsWriter(indexBucket, logger)
// Pre-create the tocs directory by uploading a zero-byte sentinel file.
// The filesystem bucket's GetAndReplace (used by TableOfContentsWriter) does NOT
// create parent directories, but Upload does. This is a no-op on cloud providers.
if err := indexBucket.Upload(context.Background(), "tocs/.init", bytes.NewReader(nil)); err != nil {
level.Debug(logger).Log("msg", "failed to pre-create index tocs directory", "err", err)
}
if err := bucket.Upload(context.Background(), "tocs/.init", bytes.NewReader(nil)); err != nil {
level.Debug(logger).Log("msg", "failed to pre-create log tocs directory", "err", err)
}
// indexobj.Builder config: use the same base config as the log builder.
indexBuilder, err := indexobj.NewBuilder(cfg.BuilderBaseConfig, scratchStore)
if err != nil {
return nil, fmt.Errorf("failed to create index builder: %w", err)
}
wrappedFlusher := &tocFlusher{
inner: s.flusher,
logBucket: bucket,
logTocWriter: logTocWriter,
indexCalculator: dataobjindex.NewCalculator(indexBuilder),
indexUploader: indexUploader,
indexTocWriter: indexTocWriter,
logger: logger,
}
wrapped := prometheus.WrapRegistererWith(prometheus.Labels{
"partition": strconv.Itoa(int(partitionID)),
}, reg)
builder, err := builderFactory.NewBuilder(wrapped)
if err != nil {
return nil, fmt.Errorf("failed to initialize data object builder: %w", err)
}
flushCommitter := newFlushCommitter(
wrappedFlusher,
noopMetastoreEventEmitter{},
noopCommitter{},
partitionID,
logger,
wrapped,
)
s.processor = newProcessor(
builder,
recordsChan,
flushCommitter,
cfg.IdleFlushTimeout,
cfg.MaxBuilderAge,
logger,
wrapped,
)
s.Service = services.NewBasicService(s.inMemoryStarting, s.running, s.inMemoryStopping)
return s, nil
}
func (s *Service) inMemoryStarting(ctx context.Context) error {
level.Info(s.logger).Log("msg", "starting inmemory dataobj consumer")
if err := services.StartAndAwaitRunning(ctx, s.processor); err != nil {
return fmt.Errorf("failed to start partition processor: %w", err)
}
return nil
}
func (s *Service) inMemoryStopping(failureCase error) error {
level.Info(s.logger).Log("msg", "stopping inmemory dataobj consumer")
ctx := context.TODO()
if err := services.StopAndAwaitTerminated(ctx, s.processor); err != nil {
level.Warn(s.logger).Log("msg", "failed to stop partition processor", "err", err)
}
level.Info(s.logger).Log("msg", "stopped inmemory dataobj consumer")
return failureCase
}
func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objstore.Bucket, scratchStore scratch.Store, _ string, _ ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) (*Service, error) {

@ -113,6 +113,8 @@ type Config struct {
KafkaConfig kafka.Config `yaml:"-"`
DataObjTeeConfig DataObjTeeConfig `yaml:"dataobj_tee"`
InMemoryPushTimeout time.Duration `yaml:"inmemory_dataobj_push_timeout"`
}
// RegisterFlags registers distributor-related flags.
@ -129,12 +131,11 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
fs.BoolVar(&cfg.IngestLimitsEnabled, "distributor.ingest-limits-enabled", false, "Enable checking limits against the ingest-limits service. Defaults to false.")
fs.BoolVar(&cfg.IngestLimitsDryRunEnabled, "distributor.ingest-limits-dry-run-enabled", false, "Enable dry-run mode where limits are checked the ingest-limits service, but not enforced. Defaults to false.")
fs.DurationVar(&cfg.InMemoryPushTimeout, "distributor.inmemory-dataobj-push-timeout", 5*time.Second,
"Timeout for sending a record to the in-memory queue before returning backpressure to the caller. Defaults to 5s. Set to 0 for no timeout.")
}
func (cfg *Config) Validate() error {
if !cfg.KafkaEnabled && !cfg.IngesterEnabled {
return fmt.Errorf("at least one of kafka and ingestor writes must be enabled")
}
if err := cfg.DataObjTeeConfig.Validate(); err != nil {
return err
}
@ -142,6 +143,9 @@ func (cfg *Config) Validate() error {
if cfg.MaxDecompressedSize == 0 && cfg.MaxRecvMsgSize > 0 {
cfg.MaxDecompressedSize = int64(cfg.MaxRecvMsgSize) * 50
}
if cfg.InMemoryPushTimeout < 0 {
return errors.New("distributor.inmemory-dataobj-push-timeout must be >= 0")
}
return nil
}

@ -2801,12 +2801,13 @@ func TestConfig_Validate(t *testing.T) {
expectedMaxDecompressedSize: 0, // Should remain 0
},
{
name: "validates kafka and ingester enabled",
// kafka=false + ingester=false is now allowed: in inmemory mode the tee is
// wired programmatically and does not require either flag.
name: "kafka=false ingester=false is valid (inmemory mode uses programmatic tee)",
cfg: Config{
KafkaEnabled: false,
IngesterEnabled: false,
},
expectedError: "at least one of kafka and ingestor writes must be enabled",
},
}

@ -0,0 +1,117 @@
package distributor
import (
"context"
"fmt"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/kafka"
)
const (
inmemoryTeeDefaultTopic = "loki"
inmemoryTeeDefaultMaxBufferedBytes = 100 << 20 // 100 MB
)
// InMemoryDataObjTee implements the [Tee] interface and sends encoded log
// records to an in-process channel instead of to Kafka. It is used in
// single-binary mode with ingest_mode=inmemory.
type InMemoryDataObjTee struct {
records chan *kgo.Record
topic string
maxBufferedBytes int
pushTimeout time.Duration
logger log.Logger
streams prometheus.Counter
streamFailures *prometheus.CounterVec
}
// NewInMemoryDataObjTee returns a new InMemoryDataObjTee that sends encoded
// records to the given channel. reg and logger may be nil.
func NewInMemoryDataObjTee(records chan *kgo.Record, reg prometheus.Registerer, logger log.Logger, pushTimeout time.Duration) *InMemoryDataObjTee {
if logger == nil {
logger = log.NewNopLogger()
}
return &InMemoryDataObjTee{
records: records,
topic: inmemoryTeeDefaultTopic,
maxBufferedBytes: inmemoryTeeDefaultMaxBufferedBytes,
pushTimeout: pushTimeout,
logger: logger,
streams: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_inmemory_dataobj_tee_streams_total",
Help: "Total number of streams duplicated (both successful and failed) to the in-memory dataobj channel.",
}),
streamFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "loki_distributor_inmemory_dataobj_tee_stream_failures_total",
Help: "Total number of streams that could not be duplicated to the in-memory dataobj channel.",
}, []string{"reason"}),
}
}
// Register implements [Tee]. It adds all streams to the pushTracker's pending count
// so the distributor waits for them before concluding a push request.
func (t *InMemoryDataObjTee) Register(_ context.Context, _ string, streams []KeyedStream, pushTracker *PushTracker) {
pushTracker.streamsPending.Add(int32(len(streams)))
}
// Duplicate implements [Tee]. It encodes each stream and sends it to the
// in-process channel, calling pushTracker.doneWithResult for each stream.
func (t *InMemoryDataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker) {
go func() {
for _, s := range streams {
t.duplicate(ctx, tenant, s, pushTracker)
}
}()
}
func (t *InMemoryDataObjTee) duplicate(ctx context.Context, tenant string, stream KeyedStream, pushTracker *PushTracker) {
t.streams.Inc()
records, err := kafka.EncodeWithTopic(t.topic, 0, tenant, stream.Stream, t.maxBufferedBytes)
if err != nil {
level.Error(t.logger).Log("msg", "failed to encode stream for in-memory tee", "err", err)
t.streamFailures.WithLabelValues("encode_error").Inc()
pushTracker.doneWithResult(fmt.Errorf("couldn't process request internally due to inmemory tee error: %d", TeeCouldntEncodeStreamError))
return
}
// Single timer for the whole stream batch. Using time.NewTimer + defer Stop
// avoids the leak caused by time.After inside a loop (each call creates a timer
// that lives until it fires, even if the select chose a different case).
// A nil channel blocks forever, so timeout stays nil when pushTimeout == 0.
//
// Note: if the channel send times out mid-batch, earlier records from this
// stream are already queued. The consumer will process them as a partial
// stream. This is acceptable in inmemory mode (no durability guarantees).
var timeout <-chan time.Time
if t.pushTimeout > 0 {
timer := time.NewTimer(t.pushTimeout)
defer timer.Stop()
timeout = timer.C
}
for _, rec := range records {
select {
case t.records <- rec:
case <-timeout:
level.Error(t.logger).Log("msg", "in-memory dataobj tee channel full, dropping record", "tenant", tenant)
t.streamFailures.WithLabelValues("channel_full").Inc()
pushTracker.doneWithResult(fmt.Errorf("couldn't process request internally due to inmemory tee error: %d", TeeCouldntProduceRecordsError))
return
case <-ctx.Done():
t.streamFailures.WithLabelValues("cancellation").Inc()
pushTracker.doneWithResult(ctx.Err())
return
}
}
pushTracker.doneWithResult(nil)
}

@ -0,0 +1,71 @@
package distributor
import (
"context"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/logproto"
)
func newTestInMemoryTee(capacity int) (*InMemoryDataObjTee, chan *kgo.Record) {
ch := make(chan *kgo.Record, capacity)
tee := NewInMemoryDataObjTee(ch, prometheus.NewRegistry(), nil, 5*time.Second)
return tee, ch
}
func TestInMemoryDataObjTee_Register_AddsPending(t *testing.T) {
tee, _ := newTestInMemoryTee(10)
ctx := context.Background()
streams := []KeyedStream{
{Stream: logproto.Stream{Labels: `{a="b"}`}},
{Stream: logproto.Stream{Labels: `{c="d"}`}},
{Stream: logproto.Stream{Labels: `{e="f"}`}},
}
pushTracker := &PushTracker{
done: make(chan struct{}, 1),
err: make(chan error, 1),
}
tee.Register(ctx, "tenant", streams, pushTracker)
assert.EqualValues(t, 3, pushTracker.streamsPending.Load())
}
func TestInMemoryDataObjTee_Duplicate_SendsRecords(t *testing.T) {
tee, ch := newTestInMemoryTee(100)
ctx := context.Background()
tenant := "test-tenant"
now := time.Now()
streams := []KeyedStream{
{
Stream: logproto.Stream{
Labels: `{job="test"}`,
Entries: []logproto.Entry{
{Timestamp: now, Line: "line1"},
{Timestamp: now.Add(time.Second), Line: "line2"},
},
},
},
}
pushTracker := &PushTracker{
done: make(chan struct{}, 1),
err: make(chan error, 1),
}
pushTracker.streamsPending.Store(1)
tee.Duplicate(ctx, tenant, streams, pushTracker)
// Wait for at least one record to arrive.
select {
case rec := <-ch:
assert.NotNil(t, rec)
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for record")
}
}

@ -693,7 +693,9 @@ func (t *Loki) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool
// Ingester has a special check that makes sure that it was able to register into the ring,
// and that all other ring entries are OK too.
if t.Ingester != nil {
// In inmemory dataobj mode the write path bypasses the ingester entirely, so skip this gate.
inMemoryDataObjMode := t.Cfg.DataObj.Enabled && t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory
if t.Ingester != nil && !inMemoryDataObjMode {
if err := t.Ingester.CheckReady(r.Context()); err != nil {
http.Error(w, fmt.Sprintf("Ingester not ready: %s", err), http.StatusServiceUnavailable)
return
@ -735,6 +737,13 @@ func (t *Loki) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool
}
}
if t.dataObjConsumer != nil {
if err := t.dataObjConsumer.CheckReady(r.Context()); err != nil {
http.Error(w, fmt.Sprintf("DataObj Consumer not ready: %s", err), http.StatusServiceUnavailable)
return
}
}
http.Error(w, "ready", http.StatusOK)
}
}
@ -860,6 +869,14 @@ func (t *Loki) setupModuleManager() error {
deps[All] = append(deps[All], IngestLimits, IngestLimitsFrontend)
}
if t.Cfg.DataObj.Enabled {
deps[All] = append(deps[All], DataObjConsumer, DataObjIndexBuilder)
if t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory {
// DataObjConsumer must be initialized before Distributor so that
deps[Distributor] = append(deps[Distributor], DataObjConsumer)
}
}
if t.Cfg.Querier.PerRequestLimitsEnabled {
level.Debug(util_log.Logger).Log("msg", "per-query request limits support enabled")
mm.RegisterModule(QueryLimiter, t.initQueryLimiter, modules.UserInvisibleModule)

@ -355,6 +355,15 @@ func (t *Loki) initDistributor() (services.Service, error) {
return nil, errors.New("kafka is enabled in distributor but not in ingester")
}
// In inmemory mode, wire the in-process tee before creating the distributor.
// DataObjTeeConfig.Enabled stays false so distributor.New() creates no internal Kafka tee.
if t.Cfg.DataObj.Enabled && t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory {
reg := prometheus.DefaultRegisterer
logger := log.With(util_log.Logger, "component", "inmemory-dataobj-tee")
inmemTee := distributor.NewInMemoryDataObjTee(t.dataObjConsumer.RecordsChannel(), reg, logger, t.Cfg.Distributor.InMemoryPushTimeout)
t.Tee = distributor.WrapTee(t.Tee, inmemTee)
}
// Add ingestion policy interceptors to ingester client
t.Cfg.IngesterClient.GRPCUnaryClientInterceptors = append(
t.Cfg.IngesterClient.GRPCUnaryClientInterceptors,
@ -2379,6 +2388,34 @@ func (t *Loki) initDataObjConsumer() (services.Service, error) {
return nil, err
}
if t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory {
level.Warn(util_log.Logger).Log("msg", "inmemory ingest mode is experimental — no durability guarantees, single-replica only; each replica holds independent data")
level.Info(util_log.Logger).Log("msg", "initializing inmemory dataobj consumer")
svc, err := consumer.NewInMemory(
t.Cfg.DataObj.Consumer,
t.Cfg.DataObj.Metastore,
store,
t.scratchStore,
prometheus.DefaultRegisterer,
util_log.Logger,
)
if err != nil {
return nil, err
}
t.dataObjConsumer = svc
// Register the flush endpoint for inmemory mode (testing/operational use).
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
)
t.Server.HTTP.
Methods(http.MethodPost).
Path("/dataobj-consumer/flush").
Handler(httpMiddleware.Wrap(http.HandlerFunc(t.dataObjConsumer.FlushHandler)))
return svc, nil
}
t.Cfg.DataObj.Consumer.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
level.Info(util_log.Logger).Log("msg", "initializing dataobj consumer", "instance", t.Cfg.Ingester.LifecyclerConfig.ID)
@ -2417,6 +2454,10 @@ func (t *Loki) initDataObjIndexBuilder() (services.Service, error) {
if !t.Cfg.DataObj.Enabled {
return nil, nil
}
if t.Cfg.DataObj.Consumer.IngestMode == consumer.IngestModeInMemory {
level.Info(util_log.Logger).Log("msg", "skipping dataobj index builder in inmemory mode; label queries will use full dataobj scan")
return nil, nil
}
store, err := t.getDataObjBucket("dataobj-index-builder")
if err != nil {
return nil, err

Loading…
Cancel
Save