Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/dataobj/sections/internal/columnar/decoder.go

180 lines
5.0 KiB

package columnar
import (
"bytes"
"context"
"fmt"
"io"
"time"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/protocodec"
"github.com/grafana/loki/v3/pkg/util/rangeio"
"github.com/grafana/loki/v3/pkg/xcap"
)
// A Decoder allows reading an encoded dataset-based section.
type Decoder struct {
sr dataobj.SectionReader
}
// NewDecoder creates a new [Decoder] for the given [dataobj.SectionReader]. The
// formatVersion argument must denote the format version of the data being
// decoded.
//
// NewDecoder returns an error if the format version is not supported.
func NewDecoder(reader dataobj.SectionReader, formatVersion uint32) (*Decoder, error) {
if formatVersion != FormatVersion {
return nil, fmt.Errorf("unsupported format version %d", formatVersion)
}
return &Decoder{sr: reader}, nil
}
// SectionMetadata returns the metadata for the section.
func (dec *Decoder) SectionMetadata(ctx context.Context) (*datasetmd.SectionMetadata, error) {
info, err := dec.getSectionInfo()
if err != nil {
return nil, err
}
rc, err := dec.sr.MetadataRange(ctx, int64(info.SectionMetadataOffset), int64(info.SectionMetadataLength))
if err != nil {
return nil, fmt.Errorf("reading section metadata: %w", err)
}
defer rc.Close()
br := bufpool.GetReader(rc)
defer bufpool.PutReader(br)
var md datasetmd.SectionMetadata
if err := protocodec.Decode(br, &md); err != nil {
return nil, fmt.Errorf("decoding section metadata: %w", err)
}
return &md, nil
}
func (dec *Decoder) getSectionInfo() (*datasetmd.SectionInfoExtension, error) {
data := dec.sr.ExtensionData()
if len(data) == 0 {
return nil, fmt.Errorf("section is missing required extension_data")
}
var ext datasetmd.SectionInfoExtension
if err := protocodec.Decode(bytes.NewReader(data), &ext); err != nil {
return nil, err
}
return &ext, nil
}
// Pages returns the set of pages for the provided columns. The order of slices
// of pages emitted by the iterator matches the order of the columns slice: the
// first slice corresponds to the first column, and so on.
func (dec *Decoder) Pages(ctx context.Context, columns []*datasetmd.ColumnDesc) result.Seq[[]*datasetmd.PageDesc] {
return result.Iter(func(yield func([]*datasetmd.PageDesc) bool) error {
region := xcap.RegionFromContext(ctx)
startTime := time.Now()
defer func() {
region.Record(xcap.StatDatasetPageDownloadTime.Observe(time.Since(startTime).Seconds()))
}()
ranges := make([]rangeio.Range, 0, len(columns))
for _, column := range columns {
ranges = append(ranges, rangeio.Range{
Data: make([]byte, column.ColumnMetadataLength),
Offset: int64(column.ColumnMetadataOffset),
})
}
reader := metadataRangeReader{Inner: dec.sr}
err := rangeio.ReadRanges(ctx, reader, ranges)
if err != nil {
return err
}
for _, r := range ranges {
var md datasetmd.ColumnMetadata
if err := protocodec.Decode(bytes.NewReader(r.Data), &md); err != nil {
return fmt.Errorf("decoding column metadata: %w", err)
}
if !yield(md.Pages) {
return nil
}
}
return nil
})
}
type metadataRangeReader struct {
Inner dataobj.SectionReader
}
var _ rangeio.Reader = (*metadataRangeReader)(nil)
func (rr metadataRangeReader) ReadRange(ctx context.Context, r rangeio.Range) (int, error) {
rc, err := rr.Inner.MetadataRange(ctx, r.Offset, r.Len())
if err != nil {
return 0, err
}
defer rc.Close()
return io.ReadFull(rc, r.Data)
}
// ReadPages reads the provided set of pages, iterating over their data matching
// the argument order. If an error is encountered while retrieving pages, an
// error is emitted from the sequence and iteration stops.
func (dec *Decoder) ReadPages(ctx context.Context, pages []*datasetmd.PageDesc) result.Seq[dataset.PageData] {
return result.Iter(func(yield func(dataset.PageData) bool) error {
region := xcap.RegionFromContext(ctx)
startTime := time.Now()
defer func() {
region.Record(xcap.StatDatasetPageDownloadTime.Observe(time.Since(startTime).Seconds()))
}()
ranges := make([]rangeio.Range, 0, len(pages))
for _, page := range pages {
ranges = append(ranges, rangeio.Range{
Data: make([]byte, page.DataSize),
Offset: int64(page.DataOffset),
})
}
reader := dataRangeReader{Inner: dec.sr}
err := rangeio.ReadRanges(ctx, reader, ranges)
if err != nil {
return err
}
for _, r := range ranges {
if !yield(dataset.PageData(r.Data)) {
return nil
}
}
return nil
})
}
type dataRangeReader struct {
Inner dataobj.SectionReader
}
var _ rangeio.Reader = (*dataRangeReader)(nil)
func (rr dataRangeReader) ReadRange(ctx context.Context, r rangeio.Range) (int, error) {
rc, err := rr.Inner.DataRange(ctx, r.Offset, r.Len())
if err != nil {
return 0, err
}
defer rc.Close()
return io.ReadFull(rc, r.Data)
}