Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/README.md

575 lines
17 KiB

# DataObj Package Documentation
## Overview
The `dataobj` package provides a container format for storing and retrieving structured data from object storage. It's designed specifically for Loki's log storage needs, enabling efficient columnar storage and retrieval of log data with support for multiple tenants, metadata indexing, and flexible querying.
## Table of Contents
- [Architecture Overview](#architecture-overview)
- [Key Concepts](#key-concepts)
- [File Format Structure](#file-format-structure)
- [Encoding Process](#encoding-process)
- [Decoding Process](#decoding-process)
- [Section Types](#section-types)
- [Operational Components](#operational-components)
- [Creating Custom Sections](#creating-custom-sections)
- [Usage Examples](#usage-examples)
---
## Architecture Overview
The dataobj package provides a hierarchical container format:
```
┌─────────────────────────────────────┐
│ Data Object File │
├─────────────────────────────────────┤
│ Header (Magic: "THOR") │
├─────────────────────────────────────┤
│ Section 1 Data │
│ Section 2 Data │
│ ... │
│ Section N Data │
├─────────────────────────────────────┤
│ Section 1 Metadata │
│ Section 2 Metadata │
│ ... │
│ Section N Metadata │
├─────────────────────────────────────┤
│ File Metadata (Protobuf) │
│ - Dictionary │
│ - Section Types │
│ - Section Layout Info │
├─────────────────────────────────────┤
│ Footer │
│ - File Format Version │
│ - File Metadata Size (4 bytes) │
│ - Magic: "THOR" │
└─────────────────────────────────────┘
```
### Core Components
1. **Builder** - Constructs data objects by accumulating sections
2. **Encoder** - Low-level encoding of sections and metadata
3. **Decoder** - Low-level decoding of sections from object storage
4. **SectionReader** - Interface for reading data/metadata from sections
5. **Section Implementations** - Specific section types (logs, streams, pointers, etc.)
---
## Key Concepts
### Data Objects
A **Data Object** is a self-contained file stored in object storage containing:
- One or more **Sections**, each holding a specific type of data
- **File Metadata** describing all sections and their layout
- **Dictionary** for efficient string storage (namespaces, kinds, tenant IDs)
### Sections
**Sections** are the primary organizational unit within a data object:
- Each section has a **type** (namespace, kind, version)
- Sections contain both **data** and **metadata** regions
- Multiple sections of the same type can exist in one object
- Sections can be tenant-specific
### Section Regions
Each section is split into two regions:
1. **Data Region** - The actual encoded data (e.g., columnar log data)
2. **Metadata Region** - Lightweight metadata to aid in reading the data region
This separation allows reading section metadata without loading the entire data payload.
### Columnar Storage
Most section implementations use columnar storage via the `internal/dataset` package:
- Data is organized into **columns** (e.g., timestamp, stream_id, message)
- Columns are split into **pages** for efficient random access
- Pages can be compressed (typically with zstd)
- Supports predicates for filtering during reads
---
## File Format Structure
### File Layout
```
Offset | Content
--------|----------------------------------------------------------
0 | Magic bytes: "THOR" (4 bytes)
4 | [Section Data Region - All sections concatenated]
... | [Section Metadata Region - All sections concatenated]
... | Format Version (varint)
... | File Metadata (protobuf-encoded)
-8 | Metadata Size (uint32, little-endian)
-4 | Magic bytes: "THOR" (4 bytes)
```
File Metadata Structure (Protobuf) can be found in the `pkg/dataobj/internal/metadata` package.
---
## Encoding Process
### High-Level Encoding Flow
```
┌──────────────┐
│ Log Records │
└──────┬───────┘
┌──────────────────┐
│ Section Builder │ (e.g., logs.Builder)
│ - Buffer records │
│ - Create stripes │
│ - Merge stripes │
└──────┬───────────┘
│ Flush
┌──────────────────┐
│ Columnar Encoder │
│ - Encode columns │
│ - Compress pages │
└──────┬───────────┘
│ WriteSection
┌──────────────────┐
│ dataobj.Builder │
│ - Append section │
│ - Build metadata │
└──────┬───────────┘
│ Flush
┌──────────────────┐
│ Snapshot │
│ - In-memory/disk │
│ - Ready to upload│
└──────┬───────────┘
┌──────────────────┐
│ Object Storage │
└──────────────────┘
```
### Encoding
#### 1. Section builders
A section builder:
1. Buffers records in memory up to `BufferSize`
2. When buffer is full, sorts records and creates a **stripe**
3. Stripes are intermediate compressed tables
4. Multiple stripes are merged when flushing the section
#### 2. Encoding to Columnar Format
When flushing, the builder:
1. Converts buffered records into columnar format
2. Creates separate columns for each field (stream_id, timestamp, metadata, message)
3. Splits columns into pages
4. Compresses each page independently (zstd)
#### 3. Writing to Data Object
The data object builder:
1. Collects data and metadata from each section
2. Builds file metadata with section layout information
Column metadata includes:
- Page descriptors (offset, size, row count, compression)
- Column statistics (min/max values)
- Encoding information
#### 4. Compression Strategy
The encoding uses a multi-level compression strategy:
- **Stripes** (intermediate): zstd with `SpeedFastest` for quick buffering
- **Sections** (final): zstd with `SpeedDefault` for better compression
- **Ordered Appends**: When logs arrive pre-sorted, skips stripe creation for better performance
---
## Decoding Process
### High-Level Decoding Flow
```
┌──────────────────┐
│ Object Storage │
└──────┬───────────┘
│ Open
┌──────────────────┐
│ dataobj.Object │
│ - Read metadata │
│ - Parse sections │
└──────┬───────────┘
│ Open Section
┌──────────────────┐
│ Section (e.g. │
│ logs.Section) │
│ - Decode metadata│
│ - List columns │
└──────┬───────────┘
│ NewReader
┌──────────────────┐
│ logs.Reader │
│ - Read pages │
│ - Apply filters │
│ - Decompress │
└──────┬───────────┘
│ Read batches
┌──────────────────┐
│ Arrow Records │
└──────────────────┘
```
### Detailed Decoding Steps
#### 1. Opening a Data Object
Opening process:
1. Reads last 16KB of file in one request (optimistic read for metadata)
2. Parses footer to find metadata offset and size
3. Reads and decodes file metadata
4. Constructs Section objects with SectionReaders
#### 2. Decoding File Metadata
The decoder:
1. Validates magic bytes ("THOR")
2. Reads metadata size from last 8 bytes
3. Seeks to metadata offset
4. Decodes format version and protobuf metadata
#### 3. Opening a Section
Section opening:
1. Validates section type and version
2. Reads extension data for quick access to section metadata
3. Decodes section metadata (columnar structure)
#### 4. Reading Data
Reading process:
1. Validates reader options
2. Maps predicates to internal dataset predicates
3. Reads pages in batches (with prefetching)
4. Applies predicates at page and row level
5. Converts to Arrow record batches
Reader optimizations:
- **Range coalescing**: Adjacent pages are read in a single request
- **Parallel reads**: Multiple pages can be fetched concurrently
- **Prefetching**: Pages are fetched ahead of time while processing current batch
- **Predicate pushdown**: Filters applied at page level using statistics
#### 5. Value Encoding Types
Values in pages can use different encodings:
- **Plain**: Raw values stored directly
- **Delta**: Store deltas between consecutive values (efficient for sorted data)
- **Bitmap**: For repetition levels (null/non-null indicators)
---
## Section Types
The package provides several built-in section types:
### 1. Logs Section
**Namespace**: `github.com/grafana/loki`
**Kind**: `logs`
**Location**: `pkg/dataobj/sections/logs/`
Stores log records in columnar format.
**Columns:**
- `stream_id` (int64): Stream identifier
- `timestamp` (int64): Nanosecond timestamp
- `metadata` (binary): Structured metadata key-value pairs, one column per key
- `message` (binary): Log line content
### 2. Streams Section
**Namespace**: `github.com/grafana/loki`
**Kind**: `streams`
**Location**: `pkg/dataobj/sections/streams/`
Stores stream metadata and statistics.
**Columns:**
- `stream_id` (int64): Unique stream identifier
- `min_timestamp` (int64): Earliest timestamp in stream
- `max_timestamp` (int64): Latest timestamp in stream
- `labels` (binary): Label key-value pairs for the stream, one column per key
- `rows` (uint64): Number of log records
- `uncompressed_size` (uint64): Uncompressed data size
### 3. Pointers Section
**Namespace**: `github.com/grafana/loki`
**Kind**: `pointers`
**Location**: `pkg/dataobj/sections/pointers/`
Used in index objects to point to other objects that contain logs, via stream or predicate lookup.
**Columns:**
- `path` (binary): Path to data object in object storage
- `section` (int64): The section number within the referenced data object
- `pointer_kind` (int64): The type of pointer entry. Determines which fields are set in the section. Either stream or column.
// Fields present for a stream pointer type
- `stream_id` (int64): Stream identifier in this index object
- `stream_id_ref` (int64): Stream identifier in the target data object's stream section
- `min_timestamp` (int64): Min timestamp in the object for this stream
- `max_timestamp` (int64): Max timestamp in the object for this stream
- `row_count` (int64): Total rows for this stream in the target object
- `uncompressed_size` (int64): Total bytes for this stream in the target object
// Fields for a column pointer type
- `column_name` (binary): The name of the column in the referenced object
- `column_index` (int64): The index number of the column in the referenced object
- `values_bloom_filter` (binary): A bloom filter for unique values seen in the referenced column
### 4. Index Pointers Section
**Namespace**: `github.com/grafana/loki`
**Kind**: `indexpointers`
**Location**: `pkg/dataobj/sections/indexpointers/`
Used in Table of Contents (toc) objects to point to index objects, via a time range lookup.
**Columns:**
- `path` (binary): Path to the index file
- `min_time` (int64): Minimum time covered by the referenced index object
- `max_time` (int64): Maximum time covered by the referenced index object
---
## Operational Components
### Consumer
**Location**: `pkg/dataobj/consumer/`
The consumer reads log data from Kafka and builds data objects.
**Key Features:**
- Reads from Kafka partitions
- Accumulates logs into data objects
- Flushes based on size or idle timeout
- Commits offsets after successful upload
- Emits metadata events containing a reference to each successfully uploaded object.
### Metastore
**Location**: `pkg/dataobj/metastore/`
Manages an index of data objects and their contents for efficient querying.
The metastore serves queries by the following:
1. Fetch and scan relevant Table of Contents (toc) files from the query time range to resolve index objects.
2. Fetches resolved index objects and utilises the contained indexes (stream sections, blooms, etc.) to resolve log objects & metadata such as size and number of log lines.
### Index Builder
**Location**: `pkg/dataobj/index/`
Creates index objects that contain indexes over data objects containing the logs.
### Explorer Service
**Location**: `pkg/dataobj/explorer/`
HTTP service for inspecting data objects.
---
## Usage Examples
### Example 1: Building and Uploading a Data Object
```go
package main
import (
"context"
"time"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/prometheus/prometheus/model/labels"
)
func main() {
ctx := context.Background()
// Create logs builder
logsBuilder := logs.NewBuilder(nil, logs.BuilderOptions{
PageSizeHint: 256 * 1024,
BufferSize: 10 * 1024 * 1024,
AppendStrategy: logs.AppendUnordered,
SortOrder: logs.SortStreamASC,
})
// Append log records
logsBuilder.Append(logs.Record{
StreamID: 12345,
Timestamp: time.Now(),
Metadata: labels.Labels{{Name: "level", Value: "error"}},
Line: []byte("error occurred"),
})
// Create data object
objBuilder := dataobj.NewBuilder(nil)
objBuilder.Append(logsBuilder)
obj, closer, err := objBuilder.Flush()
if err != nil {
panic(err)
}
defer closer.Close()
// Upload to object storage
uploader := uploader.New(uploaderCfg, bucket, logger)
objectPath, err := uploader.Upload(ctx, obj)
if err != nil {
panic(err)
}
println("Uploaded to:", objectPath)
}
```
### Example 2: Reading and Querying Logs
```go
package main
import (
"context"
"io"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)
func main() {
ctx := context.Background()
// Open data object
obj, err := dataobj.FromBucket(ctx, bucket, "objects/ab/cd123...")
if err != nil {
panic(err)
}
// Find logs sections
var logsSection *dataobj.Section
for _, sec := range obj.Sections() {
if logs.CheckSection(sec) {
logsSection = sec
break
}
}
// Open logs section
section, err := logs.Open(ctx, logsSection)
if err != nil {
panic(err)
}
// Find columns
var timestampCol, messageCol *logs.Column
for _, col := range section.Columns() {
switch col.Type {
case logs.ColumnTypeTimestamp:
timestampCol = col
case logs.ColumnTypeMessage:
messageCol = col
}
}
// Create reader with predicate
reader := logs.NewReader(logs.ReaderOptions{
Columns: []*logs.Column{timestampCol, messageCol},
Predicates: []logs.Predicate{
logs.GreaterThanPredicate{
Column: timestampCol,
Value: scalar.NewInt64Scalar(startTime.UnixNano()),
},
},
})
defer reader.Close()
// Read batches
for {
batch, err := reader.Read(ctx, 1000)
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
// Process batch (Arrow record)
for i := 0; i < int(batch.NumRows()); i++ {
timestamp := batch.Column(0).(*array.Timestamp).Value(i)
message := batch.Column(1).(*array.String).Value(i)
println(timestamp, message)
}
batch.Release()
}
}
```
---
## Testing and Tools
### Inspect Tool
```bash
# Inspect data object structure
go run ./pkg/dataobj/tools/inspect.go -path objects/ab/cd123...
# Show statistics
go run ./pkg/dataobj/tools/stats.go -path objects/ab/cd123...
```
### Explorer Service
The explorer provides a web UI for browsing data objects:
```bash
# Start explorer
./loki -target=dataobj-explorer
# Access UI
curl http://localhost:3100/dataobj/api/v1/list
```