mirror of https://github.com/grafana/loki
chore: single binary without Kafka (#21318)
Signed-off-by: Shantanu Alshi <shantanu.alshi@grafana.com> Co-authored-by: Christian Haudum <christian.haudum@gmail.com>pull/18598/merge
parent
904c963d17
commit
368583e778
@ -0,0 +1,117 @@ |
||||
# Single-binary Loki with dataobjects, no Kafka. |
||||
# Writes go to the in-process stream sink -> dataobj consumer -> object storage. |
||||
# No ingester/chunk writes; query via v2 engine (dataobjects). |
||||
|
||||
auth_enabled: false |
||||
|
||||
server: |
||||
http_listen_port: 3100 |
||||
grpc_listen_port: 9096 |
||||
log_level: info |
||||
grpc_server_max_concurrent_streams: 1000 |
||||
|
||||
common: |
||||
instance_addr: 127.0.0.1 |
||||
path_prefix: /tmp/loki |
||||
scratch_path: /tmp/loki/scratch |
||||
storage: |
||||
filesystem: |
||||
chunks_directory: /tmp/loki/chunks |
||||
rules_directory: /tmp/loki/rules |
||||
replication_factor: 1 |
||||
ring: |
||||
kvstore: |
||||
store: inmemory |
||||
|
||||
# Dataobj: in-memory ingest (no Kafka) |
||||
dataobj: |
||||
enabled: true |
||||
storage_bucket_prefix: "dataobj/" |
||||
consumer: |
||||
ingest_mode: inmemory |
||||
idle_flush_timeout: 2m |
||||
max_builder_age: 5m |
||||
# Optional: inmemory_channel_capacity: 10000, inmemory_push_timeout: 5s |
||||
metastore: |
||||
index_storage_prefix: "index/v0" |
||||
partition_ratio: 10 |
||||
|
||||
# No Kafka |
||||
# kafka_config is omitted; not used when ingest_mode=inmemory |
||||
|
||||
# Querier |
||||
querier: |
||||
query_partition_ingesters: false |
||||
|
||||
# V2 query engine: use dataobjects for all data (storage_lag=0 so no cutoff to chunks) |
||||
query_engine: |
||||
enable: true |
||||
enable_engine_router: false |
||||
storage_lag: 0 |
||||
|
||||
# Ingest limits off (they use Kafka) |
||||
ingest_limits: |
||||
enabled: false |
||||
|
||||
ingest_limits_frontend: |
||||
num_partitions: 1 |
||||
lifecycler: |
||||
ring: |
||||
kvstore: |
||||
store: inmemory |
||||
replication_factor: 1 |
||||
|
||||
# Ingester not used for writes (distributor sends only to dataobj stream sink) |
||||
ingester: |
||||
wal: |
||||
enabled: false |
||||
kafka_ingestion: |
||||
enabled: false |
||||
lifecycler: |
||||
id: "loki.local" |
||||
|
||||
# Distributor: only dataobj in-memory sink (no Kafka, no ingester) |
||||
distributor: |
||||
kafka_writes_enabled: false |
||||
ingester_writes_enabled: false |
||||
ingest_limits_enabled: false |
||||
ingest_limits_dry_run_enabled: false |
||||
|
||||
# Query range |
||||
query_range: |
||||
cache_volume_results: false |
||||
results_cache: |
||||
cache: |
||||
embedded_cache: |
||||
enabled: false |
||||
max_size_mb: 100 |
||||
|
||||
limits_config: |
||||
metric_aggregation_enabled: true |
||||
max_global_streams_per_user: 1000 |
||||
|
||||
# Schema: TSDB + filesystem object store (dataobj uses this for objects/index) |
||||
schema_config: |
||||
configs: |
||||
- from: 2020-10-24 |
||||
store: tsdb |
||||
object_store: filesystem |
||||
schema: v13 |
||||
index: |
||||
prefix: index_ |
||||
period: 24h |
||||
|
||||
storage_config: |
||||
object_store: |
||||
filesystem: |
||||
dir: /tmp/loki |
||||
|
||||
# Disabled to avoid internal push load while testing single-binary dataobj |
||||
pattern_ingester: |
||||
enabled: false |
||||
|
||||
ruler: |
||||
alertmanager_url: http://localhost:9093 |
||||
|
||||
frontend: |
||||
encoding: protobuf |
||||
@ -0,0 +1,111 @@ |
||||
//go:build integration
|
||||
|
||||
package integration |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"os" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/integration/client" |
||||
"github.com/grafana/loki/v3/integration/cluster" |
||||
) |
||||
|
||||
func TestInmemoryPipeline(t *testing.T) { |
||||
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) { |
||||
c.SetSchemaVer("v13") |
||||
}) |
||||
defer func() { |
||||
assert.NoError(t, clu.Cleanup()) |
||||
}() |
||||
|
||||
tAll := clu.AddComponent( |
||||
"all", |
||||
"-target=all", |
||||
"-dataobj.enabled=true", |
||||
"-dataobj-consumer.ingest-mode=inmemory", |
||||
"-dataobj-consumer.target-page-size=2KB", |
||||
"-dataobj-consumer.target-builder-memory-limit=1MB", |
||||
"-dataobj-consumer.buffer-size=256KB", |
||||
"-dataobj-consumer.target-section-size=512KB", |
||||
"-dataobj-consumer.section-stripe-merge-limit=2", |
||||
"-dataobj-consumer.sha-prefix-size=2", |
||||
"-dataobj-consumer.idle-flush-timeout=100ms", |
||||
"-distributor.ingester-writes-enabled=false", |
||||
"-pattern-ingester.enabled=false", |
||||
"-query-engine.enable=true", |
||||
"-query-engine.storage-lag=0", |
||||
) |
||||
scratchPath := tAll.ClusterSharedPath() + "/scratch" |
||||
require.NoError(t, os.MkdirAll(scratchPath, 0o755)) |
||||
tAll.WithExtraConfig(fmt.Sprintf(` |
||||
common: |
||||
scratch_path: %s |
||||
ring: |
||||
kvstore: |
||||
store: inmemory |
||||
ingest_limits: |
||||
enabled: false |
||||
`, scratchPath)) |
||||
|
||||
require.NoError(t, clu.Run()) |
||||
|
||||
tenantID := randStringRunes() |
||||
cli := client.New(tenantID, "", tAll.HTTPURL()) |
||||
cli.Now = time.Now().Add(-30 * time.Second) |
||||
|
||||
t.Run("round-trip", func(t *testing.T) { |
||||
require.NoError(t, cli.PushLogLine("pipeline-line-1", cli.Now, nil, map[string]string{"job": "pipeline-test"})) |
||||
require.NoError(t, cli.PushLogLine("pipeline-line-2", cli.Now.Add(-time.Second), nil, map[string]string{"job": "pipeline-test"})) |
||||
|
||||
require.NoError(t, cli.FlushDataobj()) |
||||
|
||||
var lines []string |
||||
require.Eventually(t, func() bool { |
||||
qresp, err := cli.RunRangeQuery(context.Background(), `{job="pipeline-test"}`) |
||||
if err != nil { |
||||
return false |
||||
} |
||||
lines = nil |
||||
for _, stream := range qresp.Data.Stream { |
||||
for _, val := range stream.Values { |
||||
lines = append(lines, val[1]) |
||||
} |
||||
} |
||||
return len(lines) >= 2 |
||||
}, 30*time.Second, 100*time.Millisecond, "pushed logs never appeared in query results") |
||||
|
||||
assert.ElementsMatch(t, []string{"pipeline-line-1", "pipeline-line-2"}, lines) |
||||
}) |
||||
|
||||
t.Run("idle-flush-timeout", func(t *testing.T) { |
||||
tenantIdle := randStringRunes() |
||||
cliIdle := client.New(tenantIdle, "", tAll.HTTPURL()) |
||||
cliIdle.Now = time.Now() |
||||
|
||||
require.NoError(t, cliIdle.PushLogLine("idle-flush-line", cliIdle.Now, nil, map[string]string{"job": "idle-test"})) |
||||
|
||||
// Do NOT call the flush endpoint — let the idle flush timeout (100ms) trigger naturally.
|
||||
var lines []string |
||||
require.Eventually(t, func() bool { |
||||
qresp, err := cliIdle.RunRangeQuery(context.Background(), `{job="idle-test"}`) |
||||
if err != nil { |
||||
return false |
||||
} |
||||
lines = nil |
||||
for _, stream := range qresp.Data.Stream { |
||||
for _, val := range stream.Values { |
||||
lines = append(lines, val[1]) |
||||
} |
||||
} |
||||
return len(lines) >= 1 |
||||
}, 15*time.Second, 200*time.Millisecond, "idle flush did not trigger within timeout") |
||||
|
||||
assert.Contains(t, lines, "idle-flush-line") |
||||
}) |
||||
} |
||||
@ -0,0 +1,58 @@ |
||||
package consumer |
||||
|
||||
import ( |
||||
"bytes" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/stretchr/testify/require" |
||||
"github.com/twmb/franz-go/pkg/kgo" |
||||
) |
||||
|
||||
// TestProcessor_Stopping_Drain verifies that processor.stopping() drains
|
||||
// buffered records from the channel and logs the correct outcome.
|
||||
func TestProcessor_Stopping_Drain(t *testing.T) { |
||||
t.Run("empty channel logs clean drain", func(t *testing.T) { |
||||
var buf bytes.Buffer |
||||
logger := log.NewLogfmtLogger(&buf) |
||||
|
||||
reg := prometheus.NewRegistry() |
||||
builder := newTestBuilder(t, reg) |
||||
fc := &mockFlushCommitter{} |
||||
ch := make(chan *kgo.Record, 10) |
||||
|
||||
proc := newProcessor(builder, ch, fc, time.Hour, time.Hour, logger, reg) |
||||
|
||||
err := proc.stopping(nil) |
||||
require.NoError(t, err) |
||||
require.Contains(t, buf.String(), "inmemory channel drained cleanly on shutdown") |
||||
}) |
||||
|
||||
t.Run("records drained and logged clean", func(t *testing.T) { |
||||
var buf bytes.Buffer |
||||
logger := log.NewLogfmtLogger(&buf) |
||||
|
||||
reg := prometheus.NewRegistry() |
||||
builder := newTestBuilder(t, reg) |
||||
fc := &mockFlushCommitter{} |
||||
ch := make(chan *kgo.Record, 10) |
||||
|
||||
// Pre-fill 3 records into channel.
|
||||
now := time.Now() |
||||
for i := 0; i < 3; i++ { |
||||
ch <- newTestRecord(t, "tenant1", now.Add(time.Duration(i)*time.Second)) |
||||
} |
||||
|
||||
proc := newProcessor(builder, ch, fc, time.Hour, time.Hour, logger, reg) |
||||
|
||||
err := proc.stopping(nil) |
||||
require.NoError(t, err) |
||||
require.True(t, |
||||
strings.Contains(buf.String(), "inmemory channel drained cleanly on shutdown"), |
||||
"expected clean drain log, got: %s", buf.String(), |
||||
) |
||||
}) |
||||
} |
||||
@ -0,0 +1,117 @@ |
||||
package distributor |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
"github.com/twmb/franz-go/pkg/kgo" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/kafka" |
||||
) |
||||
|
||||
const ( |
||||
inmemoryTeeDefaultTopic = "loki" |
||||
inmemoryTeeDefaultMaxBufferedBytes = 100 << 20 // 100 MB
|
||||
) |
||||
|
||||
// InMemoryDataObjTee implements the [Tee] interface and sends encoded log
|
||||
// records to an in-process channel instead of to Kafka. It is used in
|
||||
// single-binary mode with ingest_mode=inmemory.
|
||||
type InMemoryDataObjTee struct { |
||||
records chan *kgo.Record |
||||
topic string |
||||
maxBufferedBytes int |
||||
pushTimeout time.Duration |
||||
logger log.Logger |
||||
|
||||
streams prometheus.Counter |
||||
streamFailures *prometheus.CounterVec |
||||
} |
||||
|
||||
// NewInMemoryDataObjTee returns a new InMemoryDataObjTee that sends encoded
|
||||
// records to the given channel. reg and logger may be nil.
|
||||
func NewInMemoryDataObjTee(records chan *kgo.Record, reg prometheus.Registerer, logger log.Logger, pushTimeout time.Duration) *InMemoryDataObjTee { |
||||
if logger == nil { |
||||
logger = log.NewNopLogger() |
||||
} |
||||
return &InMemoryDataObjTee{ |
||||
records: records, |
||||
topic: inmemoryTeeDefaultTopic, |
||||
maxBufferedBytes: inmemoryTeeDefaultMaxBufferedBytes, |
||||
pushTimeout: pushTimeout, |
||||
logger: logger, |
||||
streams: promauto.With(reg).NewCounter(prometheus.CounterOpts{ |
||||
Name: "loki_distributor_inmemory_dataobj_tee_streams_total", |
||||
Help: "Total number of streams duplicated (both successful and failed) to the in-memory dataobj channel.", |
||||
}), |
||||
streamFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ |
||||
Name: "loki_distributor_inmemory_dataobj_tee_stream_failures_total", |
||||
Help: "Total number of streams that could not be duplicated to the in-memory dataobj channel.", |
||||
}, []string{"reason"}), |
||||
} |
||||
} |
||||
|
||||
// Register implements [Tee]. It adds all streams to the pushTracker's pending count
|
||||
// so the distributor waits for them before concluding a push request.
|
||||
func (t *InMemoryDataObjTee) Register(_ context.Context, _ string, streams []KeyedStream, pushTracker *PushTracker) { |
||||
pushTracker.streamsPending.Add(int32(len(streams))) |
||||
} |
||||
|
||||
// Duplicate implements [Tee]. It encodes each stream and sends it to the
|
||||
// in-process channel, calling pushTracker.doneWithResult for each stream.
|
||||
func (t *InMemoryDataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream, pushTracker *PushTracker) { |
||||
go func() { |
||||
for _, s := range streams { |
||||
t.duplicate(ctx, tenant, s, pushTracker) |
||||
} |
||||
}() |
||||
} |
||||
|
||||
func (t *InMemoryDataObjTee) duplicate(ctx context.Context, tenant string, stream KeyedStream, pushTracker *PushTracker) { |
||||
t.streams.Inc() |
||||
|
||||
records, err := kafka.EncodeWithTopic(t.topic, 0, tenant, stream.Stream, t.maxBufferedBytes) |
||||
if err != nil { |
||||
level.Error(t.logger).Log("msg", "failed to encode stream for in-memory tee", "err", err) |
||||
t.streamFailures.WithLabelValues("encode_error").Inc() |
||||
pushTracker.doneWithResult(fmt.Errorf("couldn't process request internally due to inmemory tee error: %d", TeeCouldntEncodeStreamError)) |
||||
return |
||||
} |
||||
|
||||
// Single timer for the whole stream batch. Using time.NewTimer + defer Stop
|
||||
// avoids the leak caused by time.After inside a loop (each call creates a timer
|
||||
// that lives until it fires, even if the select chose a different case).
|
||||
// A nil channel blocks forever, so timeout stays nil when pushTimeout == 0.
|
||||
//
|
||||
// Note: if the channel send times out mid-batch, earlier records from this
|
||||
// stream are already queued. The consumer will process them as a partial
|
||||
// stream. This is acceptable in inmemory mode (no durability guarantees).
|
||||
var timeout <-chan time.Time |
||||
if t.pushTimeout > 0 { |
||||
timer := time.NewTimer(t.pushTimeout) |
||||
defer timer.Stop() |
||||
timeout = timer.C |
||||
} |
||||
|
||||
for _, rec := range records { |
||||
select { |
||||
case t.records <- rec: |
||||
case <-timeout: |
||||
level.Error(t.logger).Log("msg", "in-memory dataobj tee channel full, dropping record", "tenant", tenant) |
||||
t.streamFailures.WithLabelValues("channel_full").Inc() |
||||
pushTracker.doneWithResult(fmt.Errorf("couldn't process request internally due to inmemory tee error: %d", TeeCouldntProduceRecordsError)) |
||||
return |
||||
case <-ctx.Done(): |
||||
t.streamFailures.WithLabelValues("cancellation").Inc() |
||||
pushTracker.doneWithResult(ctx.Err()) |
||||
return |
||||
} |
||||
} |
||||
|
||||
pushTracker.doneWithResult(nil) |
||||
} |
||||
@ -0,0 +1,71 @@ |
||||
package distributor |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/twmb/franz-go/pkg/kgo" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
) |
||||
|
||||
func newTestInMemoryTee(capacity int) (*InMemoryDataObjTee, chan *kgo.Record) { |
||||
ch := make(chan *kgo.Record, capacity) |
||||
tee := NewInMemoryDataObjTee(ch, prometheus.NewRegistry(), nil, 5*time.Second) |
||||
return tee, ch |
||||
} |
||||
|
||||
func TestInMemoryDataObjTee_Register_AddsPending(t *testing.T) { |
||||
tee, _ := newTestInMemoryTee(10) |
||||
ctx := context.Background() |
||||
|
||||
streams := []KeyedStream{ |
||||
{Stream: logproto.Stream{Labels: `{a="b"}`}}, |
||||
{Stream: logproto.Stream{Labels: `{c="d"}`}}, |
||||
{Stream: logproto.Stream{Labels: `{e="f"}`}}, |
||||
} |
||||
pushTracker := &PushTracker{ |
||||
done: make(chan struct{}, 1), |
||||
err: make(chan error, 1), |
||||
} |
||||
|
||||
tee.Register(ctx, "tenant", streams, pushTracker) |
||||
assert.EqualValues(t, 3, pushTracker.streamsPending.Load()) |
||||
} |
||||
|
||||
func TestInMemoryDataObjTee_Duplicate_SendsRecords(t *testing.T) { |
||||
tee, ch := newTestInMemoryTee(100) |
||||
ctx := context.Background() |
||||
tenant := "test-tenant" |
||||
now := time.Now() |
||||
|
||||
streams := []KeyedStream{ |
||||
{ |
||||
Stream: logproto.Stream{ |
||||
Labels: `{job="test"}`, |
||||
Entries: []logproto.Entry{ |
||||
{Timestamp: now, Line: "line1"}, |
||||
{Timestamp: now.Add(time.Second), Line: "line2"}, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
pushTracker := &PushTracker{ |
||||
done: make(chan struct{}, 1), |
||||
err: make(chan error, 1), |
||||
} |
||||
pushTracker.streamsPending.Store(1) |
||||
|
||||
tee.Duplicate(ctx, tenant, streams, pushTracker) |
||||
|
||||
// Wait for at least one record to arrive.
|
||||
select { |
||||
case rec := <-ch: |
||||
assert.NotNil(t, rec) |
||||
case <-time.After(5 * time.Second): |
||||
t.Fatal("timed out waiting for record") |
||||
} |
||||
} |
||||
Loading…
Reference in new issue