Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/caching.go

445 lines
14 KiB

package executor
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
arrowcodec "github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire/arrow"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/xcap"
)
// CacheStats bundles the xcap statistics recorded by a [cachingPipeline].
// Using a struct lets different cache types record to separate stat variables.
type CacheStats struct {
Hits *xcap.StatisticInt64
Misses *xcap.StatisticInt64
Batches *xcap.StatisticInt64
Rows *xcap.StatisticInt64
Bytes *xcap.StatisticInt64
}
// newCachingPipeline wraps inner in a cachingPipeline backed by cache.
// If cache is nil, inner is returned unchanged (no caching).
func newCachingPipeline(
c cache.Cache,
inner Pipeline,
key string,
maxSizeBytes uint64,
compression string,
logger log.Logger,
stats CacheStats,
cacheName string,
) Pipeline {
if c == nil {
return inner
}
hashedKey := cache.HashKey(key)
return &cachingPipeline{
inner: inner,
cache: c,
key: hashedKey,
logger: log.With(logger, "pipeline", "caching", "cache", cacheName, "key", hashedKey),
maxSizeBytes: maxSizeBytes,
compression: compression,
stats: stats,
}
}
// cachingPipeline wraps a Pipeline and transparently stores and retrieves Arrow
// record batch results via a cache.Cache.
//
// On a cache hit: Open decodes the cached payload into a CacheEntryDecoder; Read
// iterates records through it without touching the inner pipeline.
// On a cache miss: Read add records through a CacheEntryEncoder; on EOF the
// encoded payload is committed to the cache.
type cachingPipeline struct {
inner Pipeline
cache cache.Cache
key string
logger log.Logger
maxSizeBytes uint64
compression string
stats CacheStats
hit bool
// For hit path
decoder *CacheEntryDecoder // non-nil on hit path
// For miss path
passthrough bool // true once we know we won't cache (size overflow)
encoder *CacheEntryEncoder
// Accumulated across whichever path is active
cachedRows int64
cachedRecords int64
}
// Open implements Pipeline.
func (p *cachingPipeline) Open(ctx context.Context) error {
region := xcap.RegionFromContext(ctx)
found, buffs, missing, err := p.cache.Fetch(ctx, []string{p.key})
if err == nil && len(missing) == 0 && len(found) > 0 {
dec, decErr := NewCacheEntryDecoder(buffs[0])
if decErr == nil {
p.decoder = dec
p.hit = true
region.Record(p.stats.Hits.Observe(1))
region.Record(p.stats.Bytes.Observe(int64(len(buffs[0]))))
level.Debug(p.logger).Log("msg", "task cache hit", "key", p.key, "records", p.decoder.Len())
return nil
}
// Corrupted or old-format entry — fall back to inner pipeline.
level.Error(p.logger).Log("msg", "cache decode failed, falling back to inner pipeline", "err", decErr)
}
if err != nil {
level.Error(p.logger).Log("msg", "task cache fetch failed, falling back to inner pipeline", "err", err)
}
region.Record(p.stats.Misses.Observe(1))
p.encoder = NewCacheEntryEncoder(p.compression)
level.Debug(p.logger).Log("msg", "cache miss", "key", p.key)
return p.inner.Open(ctx)
}
// Read implements Pipeline.
func (p *cachingPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
if p.hit {
rec, err := p.decoder.Next()
if rec != nil {
p.cachedRows += rec.NumRows()
p.cachedRecords++
}
if errors.Is(err, EOF) {
region := xcap.RegionFromContext(ctx)
region.Record(xcap.TaskCacheBatches.Observe(p.cachedRecords))
region.Record(xcap.TaskCacheRows.Observe(p.cachedRows))
}
return rec, err
}
rec, err := p.inner.Read(ctx)
if err != nil {
// Won't cache or non-EOF error
if !errors.Is(err, EOF) || p.passthrough {
return nil, err
}
payload, commitErr := p.encoder.Commit()
if commitErr != nil {
level.Error(p.logger).Log("msg", "failed to encode records for cache", "err", commitErr)
return nil, err
}
if storeErr := p.cache.Store(ctx, []string{p.key}, [][]byte{payload}); storeErr != nil {
level.Error(p.logger).Log("msg", "failed to store results in cache", "err", storeErr)
return nil, err
}
region := xcap.RegionFromContext(ctx)
region.Record(p.stats.Batches.Observe(p.cachedRecords))
region.Record(p.stats.Rows.Observe(p.cachedRows))
region.Record(p.stats.Bytes.Observe(int64(len(payload))))
return nil, err
}
// When passthrough is enabled, we won't cache this response
if p.passthrough {
return rec, nil
}
// maxSizeBytes==0 means only cache empty responses; bail on the first record with rows.
if p.maxSizeBytes == 0 && rec.NumRows() > 0 {
p.disableCache()
return rec, nil
}
if appendErr := p.encoder.Append(rec); appendErr != nil {
level.Error(p.logger).Log("msg", "failed to encode record for cache, skipping cache", "err", appendErr)
p.disableCache()
return rec, nil
}
p.cachedRows += rec.NumRows()
p.cachedRecords++
// Adding this last record made us go over the max cacheable size, so disable caching for this task result
if p.encoder.Size() > p.maxSizeBytes {
level.Debug(p.logger).Log("msg", "cache entry too large, skipping cache", "size", p.encoder.Size(), "max_size", p.maxSizeBytes)
p.disableCache()
}
return rec, nil
}
// disableCache switches the pipeline to passthrough mode, discarding any
// accumulated encoder state and resetting cached stats counters.
func (p *cachingPipeline) disableCache() {
p.encoder.Reset()
p.cachedRows, p.cachedRecords = 0, 0
p.passthrough = true
}
// Close implements Pipeline.
func (p *cachingPipeline) Close() {
if !p.hit {
p.inner.Close()
}
}
// TaskCacheRegistry maps TaskCacheType identifiers to backing cache stores and their stats.
type TaskCacheRegistry struct {
caches map[physical.TaskCacheName]cache.Cache
stats map[physical.TaskCacheName]CacheStats
}
// NewTaskCacheRegistry builds a registry with one independent cache per task type,
// all backed by the same resultscache.Config. Returns a zero-value (no-op) registry
// when caching is not configured.
func NewTaskCacheRegistry(cfg resultscache.Config, reg prometheus.Registerer, logger log.Logger) (TaskCacheRegistry, error) {
if !cache.IsCacheConfigured(cfg.CacheConfig) {
return TaskCacheRegistry{}, nil
}
newCache := func(name string) (cache.Cache, error) {
cfgCopy := cfg.CacheConfig
cfgCopy.Prefix += name + "."
c, err := cache.New(cfgCopy, reg, logger, stats.TaskResultCache, constants.Loki)
if err != nil {
return nil, fmt.Errorf("failed to create cache: %w", err)
}
return c, nil
}
logscan, err := newCache("logscan")
if err != nil {
return TaskCacheRegistry{}, fmt.Errorf("creating logscan task cache: %w", err)
}
metastore, err := newCache("metastore")
if err != nil {
return TaskCacheRegistry{}, fmt.Errorf("creating metastore task cache: %w", err)
}
logscanRangeAggr, err := newCache("logscan-rangeaggr")
if err != nil {
return TaskCacheRegistry{}, fmt.Errorf("creating logscan-rangeaggr task cache: %w", err)
}
dataObjScanResult, err := newCache("dataobjscan-result")
if err != nil {
return TaskCacheRegistry{}, fmt.Errorf("creating dataobjscan-result task cache: %w", err)
}
taskCacheStats := CacheStats{
Hits: xcap.TaskCacheHits,
Misses: xcap.TaskCacheMisses,
Batches: xcap.TaskCacheBatches,
Rows: xcap.TaskCacheRows,
Bytes: xcap.TaskCacheBytes,
}
dataObjScanCacheStats := CacheStats{
Hits: xcap.DataObjScanCacheHits,
Misses: xcap.DataObjScanCacheMisses,
Batches: xcap.DataObjScanCacheBatches,
Rows: xcap.DataObjScanCacheRows,
Bytes: xcap.DataObjScanCacheBytes,
}
return TaskCacheRegistry{
caches: map[physical.TaskCacheName]cache.Cache{
physical.TaskCacheLogsScan: logscan,
physical.TaskCacheLogsScanRangeAggr: logscanRangeAggr,
physical.TaskCacheMetastore: metastore,
physical.TaskCacheDataObjScanResult: dataObjScanResult,
},
stats: map[physical.TaskCacheName]CacheStats{
physical.TaskCacheLogsScan: taskCacheStats,
physical.TaskCacheLogsScanRangeAggr: taskCacheStats,
physical.TaskCacheMetastore: taskCacheStats,
physical.TaskCacheDataObjScanResult: dataObjScanCacheStats,
},
}, nil
}
// NewTestTaskCacheRegistry builds a TaskCacheRegistry backed by the provided
// cache map. Intended for use in tests that need to pre-populate cache entries.
func NewTestTaskCacheRegistry(caches map[physical.TaskCacheName]cache.Cache) TaskCacheRegistry {
return TaskCacheRegistry{
caches: caches,
stats: make(map[physical.TaskCacheName]CacheStats),
}
}
// GetForType returns the raw cache backend for the given cache type.
func (r TaskCacheRegistry) GetForType(cacheType physical.TaskCacheName) (cache.Cache, CacheStats, error) {
if c, ok := r.caches[cacheType]; ok {
return c, r.stats[cacheType], nil
}
return nil, CacheStats{}, fmt.Errorf("no cache registered for type %q", cacheType)
}
// Compression codec identifiers stored in the per-record wire format.
const (
compressionNone byte = 0
compressionSnappy byte = 1
)
// CacheEntryEncoder accumulates Arrow record batches and encodes them incrementally.
// Each record is compressed independently, allowing callers to check Size() after
// every Append and short-circuit before the full payload is committed.
type CacheEntryEncoder struct {
comp byte
frames [][]byte // per-record compressed IPC bytes
size uint64 // sum of len(frame) across all frames
}
func NewCacheEntryEncoder(compression string) *CacheEntryEncoder {
comp := compressionNone
if strings.EqualFold(compression, "snappy") {
comp = compressionSnappy
}
return &CacheEntryEncoder{comp: comp}
}
// Append serializes rec, optionally compresses it, and stores the resulting frame.
func (e *CacheEntryEncoder) Append(rec arrow.RecordBatch) error {
// Skip empty batches
if rec.NumRows() == 0 {
return nil
}
data, err := arrowcodec.DefaultArrowCodec.SerializeArrowRecord(rec)
if err != nil {
return fmt.Errorf("serializing record: %w", err)
}
if e.comp == compressionSnappy {
data = snappy.Encode(nil, data)
}
e.frames = append(e.frames, data)
e.size += uint64(len(data))
return nil
}
// Size returns the total byte size of all encoded frames accumulated so far.
// This does not include the fixed-size buffer header.
func (e *CacheEntryEncoder) Size() uint64 { return e.size }
// Commit serializes all accumulated frames into a single framed buffer.
//
// Wire format:
//
// [8 bytes: record (aka frames) count (big-endian uint64)]
// [1 byte: compression codec (0=none, 1=snappy)]
// for each record:
// [8 bytes: compressed frame length (big-endian uint64)]
// [N bytes: compressed Arrow IPC stream]
func (e *CacheEntryEncoder) Commit() ([]byte, error) {
var buf bytes.Buffer
var hdr [8]byte
binary.BigEndian.PutUint64(hdr[:], uint64(len(e.frames)))
buf.Write(hdr[:])
buf.WriteByte(e.comp)
for _, frame := range e.frames {
binary.BigEndian.PutUint64(hdr[:], uint64(len(frame)))
buf.Write(hdr[:])
buf.Write(frame)
}
return buf.Bytes(), nil
}
// Reset discards all accumulated frames and resets counters, freeing their memory.
func (e *CacheEntryEncoder) Reset() {
e.frames = nil
e.size = 0
}
// CacheEntryDecoder iterates over a framed buffer produced by [CacheEntryEncoder.Commit].
type CacheEntryDecoder struct {
data []byte
pos int
n int // total record count from header
comp byte // compression codec from header
read int // records consumed so far
}
// NewCacheEntryDecoder parses the buffer header and returns a ready decoder.
// A zero-length buffer represents an empty cached result (n=0).
func NewCacheEntryDecoder(data []byte) (*CacheEntryDecoder, error) {
if len(data) == 0 {
return &CacheEntryDecoder{}, nil
}
if len(data) < 9 {
return nil, fmt.Errorf("cache buffer too short (%d bytes)", len(data))
}
n := int(binary.BigEndian.Uint64(data[:8]))
comp := data[8]
return &CacheEntryDecoder{data: data, pos: 9, n: n, comp: comp}, nil
}
// Len returns the total number of records declared in the buffer header.
func (d *CacheEntryDecoder) Len() int { return d.n }
// Next returns the next Arrow record batch from the buffer.
// Returns (nil, [EOF]) when all records have been consumed.
func (d *CacheEntryDecoder) Next() (arrow.RecordBatch, error) {
if d.read >= d.n {
return nil, EOF
}
if d.pos >= len(d.data) {
return nil, fmt.Errorf("unexpected end of cache buffer at record %d", d.read)
}
if d.pos+8 > len(d.data) {
return nil, fmt.Errorf("truncated length header for record %d", d.read)
}
recordSize := int(binary.BigEndian.Uint64(d.data[d.pos : d.pos+8]))
d.pos += 8
if d.pos+recordSize > len(d.data) {
return nil, fmt.Errorf("truncated data for record %d (want %d bytes, have %d)", d.read, recordSize, len(d.data)-d.pos)
}
frame := d.data[d.pos : d.pos+recordSize]
d.pos += recordSize
d.read++
var ipcBytes []byte
switch d.comp {
case compressionNone:
ipcBytes = frame
case compressionSnappy:
var err error
ipcBytes, err = snappy.Decode(nil, frame)
if err != nil {
return nil, fmt.Errorf("snappy-decode record %d: %w", d.read-1, err)
}
default:
return nil, fmt.Errorf("unknown compression codec %d for record %d", d.comp, d.read-1)
}
rec, err := arrowcodec.DefaultArrowCodec.DeserializeArrowRecord(ipcBytes)
if err != nil {
return nil, fmt.Errorf("deserializing record %d: %w", d.read-1, err)
}
return rec, nil
}