Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/pointers/stats.go

125 lines
3.6 KiB

package pointers
import (
"context"
"fmt"
"time"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/pointersmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
type (
// Stats provides statistics about a streams section.
Stats struct {
UncompressedSize uint64
CompressedSize uint64
MinTimestamp time.Time
MaxTimestamp time.Time
TimestampDistribution []uint64 // Stream count per hour.
Columns []ColumnStats
}
// ColumnStats provides statistics about a column in a section.
ColumnStats struct {
Name string
Type string
ValueType string
RowsCount uint64
Compression string
UncompressedSize uint64
CompressedSize uint64
MetadataOffset uint64
MetadataSize uint64
ValuesCount uint64
Cardinality uint64
Pages []PageStats
}
// PageStats provides statistics about a page in a column.
PageStats struct {
UncompressedSize uint64
CompressedSize uint64
CRC32 uint32
RowsCount uint64
Encoding string
DataOffset uint64
DataSize uint64
ValuesCount uint64
}
)
// ReadStats returns statistics about the streams section. ReadStats returns an
// error if the streams section couldn't be inspected or if the provided ctx is
// canceled.
func ReadStats(ctx context.Context, section *Section) (Stats, error) {
var stats Stats
dec := newDecoder(section.reader)
metadata, err := dec.Metadata(ctx)
if err != nil {
return stats, fmt.Errorf("reading metadata: %w", err)
}
columnDescs := metadata.GetColumns()
pageSets, err := result.Collect(dec.Pages(ctx, columnDescs))
if err != nil {
return stats, fmt.Errorf("reading pages: %w", err)
}
for i, col := range columnDescs {
stats.CompressedSize += col.Info.CompressedSize
stats.UncompressedSize += col.Info.UncompressedSize
switch {
case col.Type == pointersmd.COLUMN_TYPE_MIN_TIMESTAMP && col.Info.Statistics != nil:
var ts dataset.Value
if err := ts.UnmarshalBinary(col.Info.Statistics.MinValue); err != nil {
return stats, fmt.Errorf("unmarshalling min timestamp: %w", err)
}
stats.MinTimestamp = time.Unix(0, ts.Int64())
case col.Type == pointersmd.COLUMN_TYPE_MAX_TIMESTAMP && col.Info.Statistics != nil:
var ts dataset.Value
if err := ts.UnmarshalBinary(col.Info.Statistics.MaxValue); err != nil {
return stats, fmt.Errorf("unmarshalling max timestamp: %w", err)
}
stats.MaxTimestamp = time.Unix(0, ts.Int64())
}
columnStats := ColumnStats{
Name: col.Info.Name,
Type: col.Type.String(),
ValueType: col.Info.ValueType.String(),
RowsCount: col.Info.RowsCount,
Compression: col.Info.Compression.String(),
UncompressedSize: col.Info.UncompressedSize,
CompressedSize: col.Info.CompressedSize,
MetadataOffset: col.Info.MetadataOffset,
MetadataSize: col.Info.MetadataSize,
ValuesCount: col.Info.ValuesCount,
Cardinality: col.Info.Statistics.GetCardinalityCount(),
}
for _, pages := range pageSets[i] {
columnStats.Pages = append(columnStats.Pages, PageStats{
UncompressedSize: pages.Info.UncompressedSize,
CompressedSize: pages.Info.CompressedSize,
CRC32: pages.Info.Crc32,
RowsCount: pages.Info.RowsCount,
Encoding: pages.Info.Encoding.String(),
DataOffset: pages.Info.DataOffset,
DataSize: pages.Info.DataSize,
ValuesCount: pages.Info.ValuesCount,
})
}
stats.Columns = append(stats.Columns, columnStats)
}
return stats, nil
}