mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
130 lines
3.9 KiB
130 lines
3.9 KiB
|
1 year ago
|
package dataobj
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
"fmt"
|
||
|
|
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
|
||
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
|
||
|
|
)
|
||
|
|
|
||
|
|
type decoder struct {
|
||
|
|
rr rangeReader
|
||
|
|
}
|
||
|
|
|
||
|
|
func (d *decoder) Metadata(ctx context.Context) (*filemd.Metadata, error) {
|
||
|
|
tailer, err := d.tailer(ctx)
|
||
|
|
if err != nil {
|
||
|
|
return nil, fmt.Errorf("reading tailer: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
rc, err := d.rr.ReadRange(ctx, int64(tailer.FileSize-tailer.MetadataSize-8), int64(tailer.MetadataSize))
|
||
|
|
if err != nil {
|
||
|
|
return nil, fmt.Errorf("getting metadata: %w", err)
|
||
|
|
}
|
||
|
|
defer rc.Close()
|
||
|
|
|
||
|
|
br := bufpool.GetReader(rc)
|
||
|
|
defer bufpool.PutReader(br)
|
||
|
|
|
||
|
|
return decodeFileMetadata(br)
|
||
|
|
}
|
||
|
|
|
||
|
|
type tailer struct {
|
||
|
|
MetadataSize uint64
|
||
|
|
FileSize uint64
|
||
|
|
}
|
||
|
|
|
||
|
|
func (d *decoder) tailer(ctx context.Context) (tailer, error) {
|
||
|
|
size, err := d.rr.Size(ctx)
|
||
|
|
if err != nil {
|
||
|
|
return tailer{}, fmt.Errorf("reading attributes: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Read the last 8 bytes of the object to get the metadata size and magic.
|
||
|
|
rc, err := d.rr.ReadRange(ctx, size-8, 8)
|
||
|
|
if err != nil {
|
||
|
|
return tailer{}, fmt.Errorf("getting file tailer: %w", err)
|
||
|
|
}
|
||
|
|
defer rc.Close()
|
||
|
|
|
||
|
|
br := bufpool.GetReader(rc)
|
||
|
|
defer bufpool.PutReader(br)
|
||
|
|
|
||
|
|
metadataSize, err := decodeTailer(br)
|
||
|
|
if err != nil {
|
||
|
|
return tailer{}, fmt.Errorf("scanning tailer: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
return tailer{
|
||
|
|
MetadataSize: uint64(metadataSize),
|
||
|
|
FileSize: uint64(size),
|
||
|
|
}, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (d *decoder) SectionReader(metadata *filemd.Metadata, section *filemd.SectionInfo) SectionReader {
|
||
|
|
return §ionReader{rr: d.rr, md: metadata, sec: section}
|
||
|
|
}
|
||
|
|
|
||
|
|
var (
|
||
|
|
errInvalidSectionType = errors.New("invalid section type")
|
||
|
|
errMissingSectionType = errors.New("missing section type")
|
||
|
|
|
||
|
|
legacyKindMapping = map[filemd.SectionKind]SectionType{ //nolint:staticcheck // Ignore deprecation warning
|
||
|
|
filemd.SECTION_KIND_UNSPECIFIED: legacySectionTypeInvalid,
|
||
|
|
filemd.SECTION_KIND_STREAMS: legacySectionTypeStreams,
|
||
|
|
filemd.SECTION_KIND_LOGS: legacySectionTypeLogs,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
// getSectionType returns the [SectionType] for the given section.
|
||
|
|
// getSectionType is backwards compatible with the old section kind system. The
|
||
|
|
// md argument is used to look up the type in the metadata.
|
||
|
|
//
|
||
|
|
// getSectionType returns an error if the section incorrectly specifies both
|
||
|
|
// the legacy kind and the new type reference.
|
||
|
|
func getSectionType(md *filemd.Metadata, section *filemd.SectionInfo) (SectionType, error) {
|
||
|
|
var (
|
||
|
|
deprecatedKind = section.Kind //nolint:staticcheck // Ignore deprecation warning
|
||
|
|
)
|
||
|
|
|
||
|
|
switch {
|
||
|
|
case deprecatedKind != filemd.SECTION_KIND_UNSPECIFIED && section.TypeRef != 0:
|
||
|
|
return legacySectionTypeInvalid, errors.New("section specifies both legacy kind and new type reference")
|
||
|
|
|
||
|
|
case deprecatedKind != filemd.SECTION_KIND_UNSPECIFIED:
|
||
|
|
typ, ok := legacyKindMapping[deprecatedKind]
|
||
|
|
if !ok {
|
||
|
|
return legacySectionTypeInvalid, errInvalidSectionType
|
||
|
|
}
|
||
|
|
return typ, nil
|
||
|
|
|
||
|
|
case section.TypeRef != 0:
|
||
|
|
if section.TypeRef >= uint32(len(md.Types)) {
|
||
|
|
return legacySectionTypeInvalid, fmt.Errorf("%w: typeRef %d out of bounds [1, %d)", errMissingSectionType, section.TypeRef, len(md.Types))
|
||
|
|
}
|
||
|
|
|
||
|
|
var (
|
||
|
|
rawType = md.Types[section.TypeRef]
|
||
|
|
|
||
|
|
namespaceRef = rawType.NameRef.NamespaceRef
|
||
|
|
kindRef = rawType.NameRef.KindRef
|
||
|
|
)
|
||
|
|
|
||
|
|
// Validate the namespace and kind references.
|
||
|
|
if namespaceRef == 0 || namespaceRef >= uint32(len(md.Dictionary)) {
|
||
|
|
return legacySectionTypeInvalid, fmt.Errorf("%w: namespaceRef %d out of bounds [1, %d)", errMissingSectionType, namespaceRef, len(md.Dictionary))
|
||
|
|
} else if kindRef == 0 || kindRef >= uint32(len(md.Dictionary)) {
|
||
|
|
return legacySectionTypeInvalid, fmt.Errorf("%w: kindRef %d out of bounds [1, %d)", errMissingSectionType, kindRef, len(md.Dictionary))
|
||
|
|
}
|
||
|
|
|
||
|
|
return SectionType{
|
||
|
|
Namespace: md.Dictionary[namespaceRef],
|
||
|
|
Kind: md.Dictionary[kindRef],
|
||
|
|
}, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
return legacySectionTypeInvalid, errMissingSectionType
|
||
|
|
}
|