Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj
Ivan Kalita 08e3c4385f
feat(metastore): shard sections queries over index files (#20134)
4 days ago
..
config chore(dataobj): Use common base config for indexobj and logsobj builders (#20037) 4 weeks ago
consumer chore: better comments when end offset is zero (#20196) 2 weeks ago
explorer
index chore: Define an E2E ingestion processing time metric (#20054) 4 weeks ago
internal chore(xcap): adds exporter to summarise capture as a structured log line (#20099) 3 weeks ago
metastore feat(metastore): shard sections queries over index files (#20134) 4 days ago
sections feat(metastore): use arrow for scanning and blooms (#20234) 1 week ago
tools
uploader
README.md chore: Add a dataobj readme (#19910) 1 month ago
builder.go
builder_test.go
dataobj.go
decoder.go
decoder_metadata.go
encoder.go
encoder_test.go
metrics.go
range_reader.go
section.go
section_reader.go
snapshot.go
snapshot_test.go

README.md

DataObj Package Documentation

Overview

The dataobj package provides a container format for storing and retrieving structured data from object storage. It's designed specifically for Loki's log storage needs, enabling efficient columnar storage and retrieval of log data with support for multiple tenants, metadata indexing, and flexible querying.

Table of Contents


Architecture Overview

The dataobj package provides a hierarchical container format:

┌─────────────────────────────────────┐
│         Data Object File            │
├─────────────────────────────────────┤
│  Header (Magic: "THOR")            │
├─────────────────────────────────────┤
│  Section 1 Data                     │
│  Section 2 Data                     │
│  ...                                │
│  Section N Data                     │
├─────────────────────────────────────┤
│  Section 1 Metadata                 │
│  Section 2 Metadata                 │
│  ...                                │
│  Section N Metadata                 │
├─────────────────────────────────────┤
│  File Metadata (Protobuf)           │
│  - Dictionary                       │
│  - Section Types                    │
│  - Section Layout Info              │
├─────────────────────────────────────┤
│  Footer                             │
│  - File Format Version              │
│  - File Metadata Size (4 bytes)     │
│  - Magic: "THOR"                    │
└─────────────────────────────────────┘

Core Components

  1. Builder - Constructs data objects by accumulating sections
  2. Encoder - Low-level encoding of sections and metadata
  3. Decoder - Low-level decoding of sections from object storage
  4. SectionReader - Interface for reading data/metadata from sections
  5. Section Implementations - Specific section types (logs, streams, pointers, etc.)

Key Concepts

Data Objects

A Data Object is a self-contained file stored in object storage containing:

  • One or more Sections, each holding a specific type of data
  • File Metadata describing all sections and their layout
  • Dictionary for efficient string storage (namespaces, kinds, tenant IDs)

Sections

Sections are the primary organizational unit within a data object:

  • Each section has a type (namespace, kind, version)
  • Sections contain both data and metadata regions
  • Multiple sections of the same type can exist in one object
  • Sections can be tenant-specific

Section Regions

Each section is split into two regions:

  1. Data Region - The actual encoded data (e.g., columnar log data)
  2. Metadata Region - Lightweight metadata to aid in reading the data region

This separation allows reading section metadata without loading the entire data payload.

Columnar Storage

Most section implementations use columnar storage via the internal/dataset package:

  • Data is organized into columns (e.g., timestamp, stream_id, message)
  • Columns are split into pages for efficient random access
  • Pages can be compressed (typically with zstd)
  • Supports predicates for filtering during reads

File Format Structure

File Layout

Offset  | Content
--------|----------------------------------------------------------
0       | Magic bytes: "THOR" (4 bytes)
4       | [Section Data Region - All sections concatenated]
...     | [Section Metadata Region - All sections concatenated]
...     | Format Version (varint)
...     | File Metadata (protobuf-encoded)
-8      | Metadata Size (uint32, little-endian)
-4      | Magic bytes: "THOR" (4 bytes)

File Metadata Structure (Protobuf) can be found in the pkg/dataobj/internal/metadata package.


Encoding Process

High-Level Encoding Flow

┌──────────────┐
│ Log Records  │
└──────┬───────┘
       │
       ▼
┌──────────────────┐
│ Section Builder  │  (e.g., logs.Builder)
│ - Buffer records │
│ - Create stripes │
│ - Merge stripes  │
└──────┬───────────┘
       │
       │ Flush
       ▼
┌──────────────────┐
│ Columnar Encoder │
│ - Encode columns │
│ - Compress pages │
└──────┬───────────┘
       │
       │ WriteSection
       ▼
┌──────────────────┐
│ dataobj.Builder  │
│ - Append section │
│ - Build metadata │
└──────┬───────────┘
       │
       │ Flush
       ▼
┌──────────────────┐
│ Snapshot         │
│ - In-memory/disk │
│ - Ready to upload│
└──────┬───────────┘
       │
       ▼
┌──────────────────┐
│ Object Storage   │
└──────────────────┘

Encoding

1. Section builders

A section builder:

  1. Buffers records in memory up to BufferSize
  2. When buffer is full, sorts records and creates a stripe
  3. Stripes are intermediate compressed tables
  4. Multiple stripes are merged when flushing the section

2. Encoding to Columnar Format

When flushing, the builder:

  1. Converts buffered records into columnar format
  2. Creates separate columns for each field (stream_id, timestamp, metadata, message)
  3. Splits columns into pages
  4. Compresses each page independently (zstd)

3. Writing to Data Object

The data object builder:

  1. Collects data and metadata from each section
  2. Builds file metadata with section layout information

Column metadata includes:

  • Page descriptors (offset, size, row count, compression)
  • Column statistics (min/max values)
  • Encoding information

4. Compression Strategy

The encoding uses a multi-level compression strategy:

  • Stripes (intermediate): zstd with SpeedFastest for quick buffering
  • Sections (final): zstd with SpeedDefault for better compression
  • Ordered Appends: When logs arrive pre-sorted, skips stripe creation for better performance

Decoding Process

High-Level Decoding Flow

┌──────────────────┐
│ Object Storage   │
└──────┬───────────┘
       │
       │ Open
       ▼
┌──────────────────┐
│ dataobj.Object   │
│ - Read metadata  │
│ - Parse sections │
└──────┬───────────┘
       │
       │ Open Section
       ▼
┌──────────────────┐
│ Section (e.g.    │
│  logs.Section)   │
│ - Decode metadata│
│ - List columns   │
└──────┬───────────┘
       │
       │ NewReader
       ▼
┌──────────────────┐
│ logs.Reader      │
│ - Read pages     │
│ - Apply filters  │
│ - Decompress     │
└──────┬───────────┘
       │
       │ Read batches
       ▼
┌──────────────────┐
│ Arrow Records    │
└──────────────────┘

Detailed Decoding Steps

1. Opening a Data Object

Opening process:

  1. Reads last 16KB of file in one request (optimistic read for metadata)
  2. Parses footer to find metadata offset and size
  3. Reads and decodes file metadata
  4. Constructs Section objects with SectionReaders

2. Decoding File Metadata

The decoder:

  1. Validates magic bytes ("THOR")
  2. Reads metadata size from last 8 bytes
  3. Seeks to metadata offset
  4. Decodes format version and protobuf metadata

3. Opening a Section

Section opening:

  1. Validates section type and version
  2. Reads extension data for quick access to section metadata
  3. Decodes section metadata (columnar structure)

4. Reading Data

Reading process:

  1. Validates reader options
  2. Maps predicates to internal dataset predicates
  3. Reads pages in batches (with prefetching)
  4. Applies predicates at page and row level
  5. Converts to Arrow record batches

Reader optimizations:

  • Range coalescing: Adjacent pages are read in a single request
  • Parallel reads: Multiple pages can be fetched concurrently
  • Prefetching: Pages are fetched ahead of time while processing current batch
  • Predicate pushdown: Filters applied at page level using statistics

5. Value Encoding Types

Values in pages can use different encodings:

  • Plain: Raw values stored directly
  • Delta: Store deltas between consecutive values (efficient for sorted data)
  • Bitmap: For repetition levels (null/non-null indicators)

Section Types

The package provides several built-in section types:

1. Logs Section

Namespace: github.com/grafana/loki Kind: logs Location: pkg/dataobj/sections/logs/

Stores log records in columnar format.

Columns:

  • stream_id (int64): Stream identifier
  • timestamp (int64): Nanosecond timestamp
  • metadata (binary): Structured metadata key-value pairs, one column per key
  • message (binary): Log line content

2. Streams Section

Namespace: github.com/grafana/loki Kind: streams Location: pkg/dataobj/sections/streams/

Stores stream metadata and statistics.

Columns:

  • stream_id (int64): Unique stream identifier
  • min_timestamp (int64): Earliest timestamp in stream
  • max_timestamp (int64): Latest timestamp in stream
  • labels (binary): Label key-value pairs for the stream, one column per key
  • rows (uint64): Number of log records
  • uncompressed_size (uint64): Uncompressed data size

3. Pointers Section

Namespace: github.com/grafana/loki Kind: pointers Location: pkg/dataobj/sections/pointers/

Used in index objects to point to other objects that contain logs, via stream or predicate lookup.

Columns:

  • path (binary): Path to data object in object storage
  • section (int64): The section number within the referenced data object
  • pointer_kind (int64): The type of pointer entry. Determines which fields are set in the section. Either stream or column.

// Fields present for a stream pointer type

  • stream_id (int64): Stream identifier in this index object
  • stream_id_ref (int64): Stream identifier in the target data object's stream section
  • min_timestamp (int64): Min timestamp in the object for this stream
  • max_timestamp (int64): Max timestamp in the object for this stream
  • row_count (int64): Total rows for this stream in the target object
  • uncompressed_size (int64): Total bytes for this stream in the target object

// Fields for a column pointer type

  • column_name (binary): The name of the column in the referenced object
  • column_index (int64): The index number of the column in the referenced object
  • values_bloom_filter (binary): A bloom filter for unique values seen in the referenced column

4. Index Pointers Section

Namespace: github.com/grafana/loki Kind: indexpointers Location: pkg/dataobj/sections/indexpointers/

Used in Table of Contents (toc) objects to point to index objects, via a time range lookup.

Columns:

  • path (binary): Path to the index file
  • min_time (int64): Minimum time covered by the referenced index object
  • max_time (int64): Maximum time covered by the referenced index object

Operational Components

Consumer

Location: pkg/dataobj/consumer/

The consumer reads log data from Kafka and builds data objects.

Key Features:

  • Reads from Kafka partitions
  • Accumulates logs into data objects
  • Flushes based on size or idle timeout
  • Commits offsets after successful upload
  • Emits metadata events containing a reference to each successfully uploaded object.

Metastore

Location: pkg/dataobj/metastore/

Manages an index of data objects and their contents for efficient querying.

The metastore serves queries by the following:

  1. Fetch and scan relevant Table of Contents (toc) files from the query time range to resolve index objects.
  2. Fetches resolved index objects and utilises the contained indexes (stream sections, blooms, etc.) to resolve log objects & metadata such as size and number of log lines.

Index Builder

Location: pkg/dataobj/index/

Creates index objects that contain indexes over data objects containing the logs.

Explorer Service

Location: pkg/dataobj/explorer/

HTTP service for inspecting data objects.


Usage Examples

Example 1: Building and Uploading a Data Object

package main

import (
    "context"
    "time"

    "github.com/grafana/loki/v3/pkg/dataobj"
    "github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
    "github.com/grafana/loki/v3/pkg/dataobj/uploader"
    "github.com/prometheus/prometheus/model/labels"
)

func main() {
    ctx := context.Background()

    // Create logs builder
    logsBuilder := logs.NewBuilder(nil, logs.BuilderOptions{
        PageSizeHint: 256 * 1024,
        BufferSize: 10 * 1024 * 1024,
        AppendStrategy: logs.AppendUnordered,
        SortOrder: logs.SortStreamASC,
    })

    // Append log records
    logsBuilder.Append(logs.Record{
        StreamID:  12345,
        Timestamp: time.Now(),
        Metadata:  labels.Labels{{Name: "level", Value: "error"}},
        Line:      []byte("error occurred"),
    })

    // Create data object
    objBuilder := dataobj.NewBuilder(nil)
    objBuilder.Append(logsBuilder)

    obj, closer, err := objBuilder.Flush()
    if err != nil {
        panic(err)
    }
    defer closer.Close()

    // Upload to object storage
    uploader := uploader.New(uploaderCfg, bucket, logger)
    objectPath, err := uploader.Upload(ctx, obj)
    if err != nil {
        panic(err)
    }

    println("Uploaded to:", objectPath)
}

Example 2: Reading and Querying Logs

package main

import (
    "context"
    "io"

    "github.com/apache/arrow-go/v18/arrow/scalar"
    "github.com/grafana/loki/v3/pkg/dataobj"
    "github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)

func main() {
    ctx := context.Background()

    // Open data object
    obj, err := dataobj.FromBucket(ctx, bucket, "objects/ab/cd123...")
    if err != nil {
        panic(err)
    }

    // Find logs sections
    var logsSection *dataobj.Section
    for _, sec := range obj.Sections() {
        if logs.CheckSection(sec) {
            logsSection = sec
            break
        }
    }

    // Open logs section
    section, err := logs.Open(ctx, logsSection)
    if err != nil {
        panic(err)
    }

    // Find columns
    var timestampCol, messageCol *logs.Column
    for _, col := range section.Columns() {
        switch col.Type {
        case logs.ColumnTypeTimestamp:
            timestampCol = col
        case logs.ColumnTypeMessage:
            messageCol = col
        }
    }

    // Create reader with predicate
    reader := logs.NewReader(logs.ReaderOptions{
        Columns: []*logs.Column{timestampCol, messageCol},
        Predicates: []logs.Predicate{
            logs.GreaterThanPredicate{
                Column: timestampCol,
                Value:  scalar.NewInt64Scalar(startTime.UnixNano()),
            },
        },
    })
    defer reader.Close()

    // Read batches
    for {
        batch, err := reader.Read(ctx, 1000)
        if err == io.EOF {
            break
        }
        if err != nil {
            panic(err)
        }

        // Process batch (Arrow record)
        for i := 0; i < int(batch.NumRows()); i++ {
            timestamp := batch.Column(0).(*array.Timestamp).Value(i)
            message := batch.Column(1).(*array.String).Value(i)
            println(timestamp, message)
        }

        batch.Release()
    }
}

Testing and Tools

Inspect Tool

# Inspect data object structure
go run ./pkg/dataobj/tools/inspect.go -path objects/ab/cd123...

# Show statistics
go run ./pkg/dataobj/tools/stats.go -path objects/ab/cd123...

Explorer Service

The explorer provides a web UI for browsing data objects:

# Start explorer
./loki -target=dataobj-explorer

# Access UI
curl http://localhost:3100/dataobj/api/v1/list