Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/dataobj.go

176 lines
5.8 KiB

// Package dataobj holds utilities for working with data objects.
//
// Data objects are a container format for storing data intended to be
// retrieved from object storage. Each data object is composed of one or more
// "sections," each of which contains a specific type of data, such as logs
// stored in a columnar format.
//
// Sections are further split into two "regions": the section data and the
// section metadata. Section metadata is intended to be a lightweight payload
// per section (usually protobuf) which aids in reading smaller portions of the
// data region.
//
// Each section has a type indicating what kind of data was encoded in that
// section. A data object may have multiple sections of the same type.
//
// The dataobj package provides a low-level [Builder] interface for composing
// sections into a dataobj, and a [SectionReader] interface for reading encoded
// sections.
//
// Section implementations are stored in their own packages and provide
// higher-level utilities for writing and reading those sections. See
// [github.com/grafana/loki/v3/pkg/dataobj/sections/logs] for an example.
//
// # Creating a new section implementation
//
// To create a new section implementation:
//
// 1. Create a new package for your section.
//
// 2. Create a new [SectionType] for your section. Pick a combination of
// namespace and kind that avoids collisions with other sections that may
// be written to a file.
//
// 3. Create a "Builder" type which implmeents [SectionBuilder]. Your builder
// type should have methods for buffering data to be appended. Encode buffered
// data on a call to Flush.
//
// 4. Create higher-level reading APIs on top of [SectionReader], decoding the
// data encoded from your builder.
//
// Then, callers can create an instance of your Builder and store it in a data
// object using [Builder.Append], and read it back using your higher-level
// APIs.
//
// While not required, it is typical for section packages to additionally
// implement:
//
// - A package-level CheckSection function to check if a [Section] was built
// with your package.
//
// - A package-specific Section type that wraps [Section] to use in your
// reading APIs.
//
// - A function which returns the estimated size of something to be appended
// to your builder, so callers can flush the section once it gets big enough.
//
// - A method on your builder to report the estimated size of the section, both
// before and after compression.
//
// ## Encoding and decoding
//
// There are no requirements on how to encode or decode a section, but there
// are recommendations:
//
// - Use protobuf for representing the metadata of the section.
//
// - Write section data so that it can be read in smaller units. For example,
// columnar data is split into pages, each of which can be read
// independently.
//
// [SectionReader]s cannot access data outside of their section. Calling
// DataRange with an offset of 0 reads data from the beginning of that
// section's data region, not from the start of the dataobj.
package dataobj
import (
"context"
"fmt"
"io"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
)
// An Object is a representation of a data object.
type Object struct {
rr rangeReader
dec *decoder
size int64
metadata *filemd.Metadata
sections []*Section
tenants []string
}
// FromBucket opens an Object from the given storage bucket and path.
// FromBucket returns an error if the metadata of the Object cannot be read or
// if the provided ctx times out.
func FromBucket(ctx context.Context, bucket objstore.BucketReader, path string) (*Object, error) {
rr := &bucketRangeReader{bucket: bucket, path: path}
size, err := rr.Size(ctx)
if err != nil {
return nil, fmt.Errorf("getting size: %w", err)
}
dec := &decoder{rr: rr, size: size}
obj := &Object{rr: rr, dec: dec, size: size}
if err := obj.init(ctx); err != nil {
return nil, err
}
return obj, nil
}
// FromReadSeeker opens an Object from the given ReaderAt. The size argument
// specifies the size of the data object in bytes. FromReaderAt returns an
// error if the metadata of the Object cannot be read.
func FromReaderAt(r io.ReaderAt, size int64) (*Object, error) {
rr := &readerAtRangeReader{size: size, r: r}
dec := &decoder{rr: rr, size: size}
obj := &Object{rr: rr, dec: dec, size: size}
if err := obj.init(context.Background()); err != nil {
return nil, err
}
return obj, nil
}
func (o *Object) init(ctx context.Context) error {
metadata, err := o.dec.Metadata(ctx)
if err != nil {
return fmt.Errorf("reading metadata: %w", err)
}
sections := make([]*Section, 0, len(metadata.Sections))
tenants := make(map[string]struct{})
for i, sec := range metadata.Sections {
typ, err := getSectionType(metadata, sec)
if err != nil {
return fmt.Errorf("getting section %d type: %w", i, err)
}
tenant := metadata.Dictionary[sec.TenantRef]
sections = append(sections, &Section{
Type: typ,
Reader: o.dec.SectionReader(metadata, sec, sec.ExtensionData),
Tenant: tenant,
})
tenants[tenant] = struct{}{}
}
o.metadata = metadata
o.sections = sections
o.tenants = make([]string, 0, len(tenants))
for tenant := range tenants {
o.tenants = append(o.tenants, tenant)
}
return nil
}
// Size returns the size of the data object in bytes.
func (o *Object) Size() int64 { return o.size }
// Sections returns the list of sections available in the Object. The slice of
// returned sections must not be mutated.
func (o *Object) Sections() Sections { return o.sections }
// Tenant returns the list of tenant that have sections in the Object. The slice of
// returned tenants must not be mutated.
func (o *Object) Tenants() []string { return o.tenants }
// Reader returns a reader for the entire raw data object.
func (o *Object) Reader(ctx context.Context) (io.ReadCloser, error) {
return o.rr.Read(ctx)
}