17 KiB
DataObj Package Documentation
Overview
The dataobj package provides a container format for storing and retrieving structured data from object storage. It's designed specifically for Loki's log storage needs, enabling efficient columnar storage and retrieval of log data with support for multiple tenants, metadata indexing, and flexible querying.
Table of Contents
- Architecture Overview
- Key Concepts
- File Format Structure
- Encoding Process
- Decoding Process
- Section Types
- Operational Components
- Creating Custom Sections
- Usage Examples
Architecture Overview
The dataobj package provides a hierarchical container format:
┌─────────────────────────────────────┐
│ Data Object File │
├─────────────────────────────────────┤
│ Header (Magic: "THOR") │
├─────────────────────────────────────┤
│ Section 1 Data │
│ Section 2 Data │
│ ... │
│ Section N Data │
├─────────────────────────────────────┤
│ Section 1 Metadata │
│ Section 2 Metadata │
│ ... │
│ Section N Metadata │
├─────────────────────────────────────┤
│ File Metadata (Protobuf) │
│ - Dictionary │
│ - Section Types │
│ - Section Layout Info │
├─────────────────────────────────────┤
│ Footer │
│ - File Format Version │
│ - File Metadata Size (4 bytes) │
│ - Magic: "THOR" │
└─────────────────────────────────────┘
Core Components
- Builder - Constructs data objects by accumulating sections
- Encoder - Low-level encoding of sections and metadata
- Decoder - Low-level decoding of sections from object storage
- SectionReader - Interface for reading data/metadata from sections
- Section Implementations - Specific section types (logs, streams, pointers, etc.)
Key Concepts
Data Objects
A Data Object is a self-contained file stored in object storage containing:
- One or more Sections, each holding a specific type of data
- File Metadata describing all sections and their layout
- Dictionary for efficient string storage (namespaces, kinds, tenant IDs)
Sections
Sections are the primary organizational unit within a data object:
- Each section has a type (namespace, kind, version)
- Sections contain both data and metadata regions
- Multiple sections of the same type can exist in one object
- Sections can be tenant-specific
Section Regions
Each section is split into two regions:
- Data Region - The actual encoded data (e.g., columnar log data)
- Metadata Region - Lightweight metadata to aid in reading the data region
This separation allows reading section metadata without loading the entire data payload.
Columnar Storage
Most section implementations use columnar storage via the internal/dataset package:
- Data is organized into columns (e.g., timestamp, stream_id, message)
- Columns are split into pages for efficient random access
- Pages can be compressed (typically with zstd)
- Supports predicates for filtering during reads
File Format Structure
File Layout
Offset | Content
--------|----------------------------------------------------------
0 | Magic bytes: "THOR" (4 bytes)
4 | [Section Data Region - All sections concatenated]
... | [Section Metadata Region - All sections concatenated]
... | Format Version (varint)
... | File Metadata (protobuf-encoded)
-8 | Metadata Size (uint32, little-endian)
-4 | Magic bytes: "THOR" (4 bytes)
File Metadata Structure (Protobuf) can be found in the pkg/dataobj/internal/metadata package.
Encoding Process
High-Level Encoding Flow
┌──────────────┐
│ Log Records │
└──────┬───────┘
│
▼
┌──────────────────┐
│ Section Builder │ (e.g., logs.Builder)
│ - Buffer records │
│ - Create stripes │
│ - Merge stripes │
└──────┬───────────┘
│
│ Flush
▼
┌──────────────────┐
│ Columnar Encoder │
│ - Encode columns │
│ - Compress pages │
└──────┬───────────┘
│
│ WriteSection
▼
┌──────────────────┐
│ dataobj.Builder │
│ - Append section │
│ - Build metadata │
└──────┬───────────┘
│
│ Flush
▼
┌──────────────────┐
│ Snapshot │
│ - In-memory/disk │
│ - Ready to upload│
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Object Storage │
└──────────────────┘
Encoding
1. Section builders
A section builder:
- Buffers records in memory up to
BufferSize - When buffer is full, sorts records and creates a stripe
- Stripes are intermediate compressed tables
- Multiple stripes are merged when flushing the section
2. Encoding to Columnar Format
When flushing, the builder:
- Converts buffered records into columnar format
- Creates separate columns for each field (stream_id, timestamp, metadata, message)
- Splits columns into pages
- Compresses each page independently (zstd)
3. Writing to Data Object
The data object builder:
- Collects data and metadata from each section
- Builds file metadata with section layout information
Column metadata includes:
- Page descriptors (offset, size, row count, compression)
- Column statistics (min/max values)
- Encoding information
4. Compression Strategy
The encoding uses a multi-level compression strategy:
- Stripes (intermediate): zstd with
SpeedFastestfor quick buffering - Sections (final): zstd with
SpeedDefaultfor better compression - Ordered Appends: When logs arrive pre-sorted, skips stripe creation for better performance
Decoding Process
High-Level Decoding Flow
┌──────────────────┐
│ Object Storage │
└──────┬───────────┘
│
│ Open
▼
┌──────────────────┐
│ dataobj.Object │
│ - Read metadata │
│ - Parse sections │
└──────┬───────────┘
│
│ Open Section
▼
┌──────────────────┐
│ Section (e.g. │
│ logs.Section) │
│ - Decode metadata│
│ - List columns │
└──────┬───────────┘
│
│ NewReader
▼
┌──────────────────┐
│ logs.Reader │
│ - Read pages │
│ - Apply filters │
│ - Decompress │
└──────┬───────────┘
│
│ Read batches
▼
┌──────────────────┐
│ Arrow Records │
└──────────────────┘
Detailed Decoding Steps
1. Opening a Data Object
Opening process:
- Reads last 16KB of file in one request (optimistic read for metadata)
- Parses footer to find metadata offset and size
- Reads and decodes file metadata
- Constructs Section objects with SectionReaders
2. Decoding File Metadata
The decoder:
- Validates magic bytes ("THOR")
- Reads metadata size from last 8 bytes
- Seeks to metadata offset
- Decodes format version and protobuf metadata
3. Opening a Section
Section opening:
- Validates section type and version
- Reads extension data for quick access to section metadata
- Decodes section metadata (columnar structure)
4. Reading Data
Reading process:
- Validates reader options
- Maps predicates to internal dataset predicates
- Reads pages in batches (with prefetching)
- Applies predicates at page and row level
- Converts to Arrow record batches
Reader optimizations:
- Range coalescing: Adjacent pages are read in a single request
- Parallel reads: Multiple pages can be fetched concurrently
- Prefetching: Pages are fetched ahead of time while processing current batch
- Predicate pushdown: Filters applied at page level using statistics
5. Value Encoding Types
Values in pages can use different encodings:
- Plain: Raw values stored directly
- Delta: Store deltas between consecutive values (efficient for sorted data)
- Bitmap: For repetition levels (null/non-null indicators)
Section Types
The package provides several built-in section types:
1. Logs Section
Namespace: github.com/grafana/loki
Kind: logs
Location: pkg/dataobj/sections/logs/
Stores log records in columnar format.
Columns:
stream_id(int64): Stream identifiertimestamp(int64): Nanosecond timestampmetadata(binary): Structured metadata key-value pairs, one column per keymessage(binary): Log line content
2. Streams Section
Namespace: github.com/grafana/loki
Kind: streams
Location: pkg/dataobj/sections/streams/
Stores stream metadata and statistics.
Columns:
stream_id(int64): Unique stream identifiermin_timestamp(int64): Earliest timestamp in streammax_timestamp(int64): Latest timestamp in streamlabels(binary): Label key-value pairs for the stream, one column per keyrows(uint64): Number of log recordsuncompressed_size(uint64): Uncompressed data size
3. Pointers Section
Namespace: github.com/grafana/loki
Kind: pointers
Location: pkg/dataobj/sections/pointers/
Used in index objects to point to other objects that contain logs, via stream or predicate lookup.
Columns:
path(binary): Path to data object in object storagesection(int64): The section number within the referenced data objectpointer_kind(int64): The type of pointer entry. Determines which fields are set in the section. Either stream or column.
// Fields present for a stream pointer type
stream_id(int64): Stream identifier in this index objectstream_id_ref(int64): Stream identifier in the target data object's stream sectionmin_timestamp(int64): Min timestamp in the object for this streammax_timestamp(int64): Max timestamp in the object for this streamrow_count(int64): Total rows for this stream in the target objectuncompressed_size(int64): Total bytes for this stream in the target object
// Fields for a column pointer type
column_name(binary): The name of the column in the referenced objectcolumn_index(int64): The index number of the column in the referenced objectvalues_bloom_filter(binary): A bloom filter for unique values seen in the referenced column
4. Index Pointers Section
Namespace: github.com/grafana/loki
Kind: indexpointers
Location: pkg/dataobj/sections/indexpointers/
Used in Table of Contents (toc) objects to point to index objects, via a time range lookup.
Columns:
path(binary): Path to the index filemin_time(int64): Minimum time covered by the referenced index objectmax_time(int64): Maximum time covered by the referenced index object
Operational Components
Consumer
Location: pkg/dataobj/consumer/
The consumer reads log data from Kafka and builds data objects.
Key Features:
- Reads from Kafka partitions
- Accumulates logs into data objects
- Flushes based on size or idle timeout
- Commits offsets after successful upload
- Emits metadata events containing a reference to each successfully uploaded object.
Metastore
Location: pkg/dataobj/metastore/
Manages an index of data objects and their contents for efficient querying.
The metastore serves queries by the following:
- Fetch and scan relevant Table of Contents (toc) files from the query time range to resolve index objects.
- Fetches resolved index objects and utilises the contained indexes (stream sections, blooms, etc.) to resolve log objects & metadata such as size and number of log lines.
Index Builder
Location: pkg/dataobj/index/
Creates index objects that contain indexes over data objects containing the logs.
Explorer Service
Location: pkg/dataobj/explorer/
HTTP service for inspecting data objects.
Usage Examples
Example 1: Building and Uploading a Data Object
package main
import (
"context"
"time"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/prometheus/prometheus/model/labels"
)
func main() {
ctx := context.Background()
// Create logs builder
logsBuilder := logs.NewBuilder(nil, logs.BuilderOptions{
PageSizeHint: 256 * 1024,
BufferSize: 10 * 1024 * 1024,
AppendStrategy: logs.AppendUnordered,
SortOrder: logs.SortStreamASC,
})
// Append log records
logsBuilder.Append(logs.Record{
StreamID: 12345,
Timestamp: time.Now(),
Metadata: labels.Labels{{Name: "level", Value: "error"}},
Line: []byte("error occurred"),
})
// Create data object
objBuilder := dataobj.NewBuilder(nil)
objBuilder.Append(logsBuilder)
obj, closer, err := objBuilder.Flush()
if err != nil {
panic(err)
}
defer closer.Close()
// Upload to object storage
uploader := uploader.New(uploaderCfg, bucket, logger)
objectPath, err := uploader.Upload(ctx, obj)
if err != nil {
panic(err)
}
println("Uploaded to:", objectPath)
}
Example 2: Reading and Querying Logs
package main
import (
"context"
"io"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)
func main() {
ctx := context.Background()
// Open data object
obj, err := dataobj.FromBucket(ctx, bucket, "objects/ab/cd123...")
if err != nil {
panic(err)
}
// Find logs sections
var logsSection *dataobj.Section
for _, sec := range obj.Sections() {
if logs.CheckSection(sec) {
logsSection = sec
break
}
}
// Open logs section
section, err := logs.Open(ctx, logsSection)
if err != nil {
panic(err)
}
// Find columns
var timestampCol, messageCol *logs.Column
for _, col := range section.Columns() {
switch col.Type {
case logs.ColumnTypeTimestamp:
timestampCol = col
case logs.ColumnTypeMessage:
messageCol = col
}
}
// Create reader with predicate
reader := logs.NewReader(logs.ReaderOptions{
Columns: []*logs.Column{timestampCol, messageCol},
Predicates: []logs.Predicate{
logs.GreaterThanPredicate{
Column: timestampCol,
Value: scalar.NewInt64Scalar(startTime.UnixNano()),
},
},
})
defer reader.Close()
// Read batches
for {
batch, err := reader.Read(ctx, 1000)
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
// Process batch (Arrow record)
for i := 0; i < int(batch.NumRows()); i++ {
timestamp := batch.Column(0).(*array.Timestamp).Value(i)
message := batch.Column(1).(*array.String).Value(i)
println(timestamp, message)
}
batch.Release()
}
}
Testing and Tools
Inspect Tool
# Inspect data object structure
go run ./pkg/dataobj/tools/inspect.go -path objects/ab/cd123...
# Show statistics
go run ./pkg/dataobj/tools/stats.go -path objects/ab/cd123...
Explorer Service
The explorer provides a web UI for browsing data objects:
# Start explorer
./loki -target=dataobj-explorer
# Access UI
curl http://localhost:3100/dataobj/api/v1/list