mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
332 lines
12 KiB
332 lines
12 KiB
package pointers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/pointersmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
|
|
)
|
|
|
|
var (
|
|
sectionLabels = prometheus.Labels{"section": sectionType.String()}
|
|
)
|
|
|
|
// Metrics instruments the streams section.
|
|
type Metrics struct {
|
|
encodeSeconds prometheus.Histogram
|
|
recordsTotal prometheus.Counter
|
|
minTimestamp prometheus.Gauge
|
|
maxTimestamp prometheus.Gauge
|
|
|
|
datasetColumnMetadataSize prometheus.Histogram
|
|
datasetColumnMetadataTotalSize prometheus.Histogram
|
|
|
|
datasetColumnCount prometheus.Histogram
|
|
datasetColumnCompressedBytes *prometheus.HistogramVec
|
|
datasetColumnUncompressedBytes *prometheus.HistogramVec
|
|
datasetColumnCompressionRatio *prometheus.HistogramVec
|
|
datasetColumnRows *prometheus.HistogramVec
|
|
datasetColumnValues *prometheus.HistogramVec
|
|
|
|
datasetPageCount *prometheus.HistogramVec
|
|
datasetPageCompressedBytes *prometheus.HistogramVec
|
|
datasetPageUncompressedBytes *prometheus.HistogramVec
|
|
datasetPageCompressionRatio *prometheus.HistogramVec
|
|
datasetPageRows *prometheus.HistogramVec
|
|
datasetPageValues *prometheus.HistogramVec
|
|
}
|
|
|
|
// NewMetrics creates a new set of metrics for the pointers section.
|
|
func NewMetrics() *Metrics {
|
|
return &Metrics{
|
|
encodeSeconds: prometheus.NewHistogram(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "pointers",
|
|
Name: "encode_seconds",
|
|
|
|
Help: "Time taken encoding pointers section in seconds.",
|
|
|
|
Buckets: prometheus.DefBuckets,
|
|
NativeHistogramBucketFactor: 1.1,
|
|
NativeHistogramMaxBucketNumber: 100,
|
|
NativeHistogramMinResetDuration: time.Hour,
|
|
}),
|
|
|
|
recordsTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "pointers",
|
|
Name: "records_total",
|
|
|
|
Help: "Total number of records in the pointers section.",
|
|
}),
|
|
|
|
minTimestamp: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "pointers",
|
|
Name: "min_timestamp",
|
|
|
|
Help: "The minimum timestamp (in unix seconds) across all pointers; this resets after an encode.",
|
|
}),
|
|
|
|
maxTimestamp: prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "pointers",
|
|
Name: "max_timestamp",
|
|
|
|
Help: "The maximum timestamp (in unix seconds) across all pointers; this resets after an encode.",
|
|
}),
|
|
|
|
datasetColumnMetadataSize: newNativeHistogram(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_metadata_size",
|
|
Help: "Distribution of column metadata size per encoded dataset column.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}),
|
|
|
|
datasetColumnMetadataTotalSize: newNativeHistogram(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_metadata_total_size",
|
|
Help: "Distribution of metadata size across all columns per encoded section.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}),
|
|
|
|
datasetColumnCount: newNativeHistogram(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_count",
|
|
Help: "Distribution of column counts per encoded dataset section.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}),
|
|
|
|
datasetColumnCompressedBytes: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_compressed_bytes",
|
|
Help: "Distribution of compressed bytes per encoded dataset column.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetColumnUncompressedBytes: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_uncompressed_bytes",
|
|
Help: "Distribution of uncompressed bytes per encoded dataset column.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetColumnCompressionRatio: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_compression_ratio",
|
|
Help: "Distribution of compression ratio per encoded dataset column. Not reported when compression is disabled.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type", "compression_type"}),
|
|
|
|
datasetColumnRows: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_rows",
|
|
Help: "Distribution of row counts per encoded dataset column.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetColumnValues: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_column_values",
|
|
Help: "Distribution of value counts per encoded dataset column.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetPageCount: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_page_count",
|
|
Help: "Distribution of page count per encoded dataset column.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetPageCompressedBytes: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_page_compressed_bytes",
|
|
Help: "Distribution of compressed bytes per encoded dataset page.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetPageUncompressedBytes: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_page_uncompressed_bytes",
|
|
Help: "Distribution of uncompressed bytes per encoded dataset page.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetPageCompressionRatio: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_page_compression_ratio",
|
|
Help: "Distribution of compression ratio per encoded dataset page. Not reported when compression is disabled.",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type", "compression_type"}),
|
|
|
|
datasetPageRows: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_page_rows",
|
|
Help: "Distribution of row counts per encoded dataset page",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
|
|
datasetPageValues: newNativeHistogramVec(prometheus.HistogramOpts{
|
|
Namespace: "loki_dataobj",
|
|
Subsystem: "encoding",
|
|
Name: "dataset_page_values",
|
|
Help: "Distribution of value counts per encoded dataset page",
|
|
|
|
ConstLabels: sectionLabels,
|
|
}, []string{"column_type"}),
|
|
}
|
|
}
|
|
|
|
// Register registers metrics to report to reg.
|
|
func (m *Metrics) Register(reg prometheus.Registerer) error {
|
|
var errs []error
|
|
errs = append(errs, reg.Register(m.encodeSeconds))
|
|
errs = append(errs, reg.Register(m.recordsTotal))
|
|
errs = append(errs, reg.Register(m.minTimestamp))
|
|
errs = append(errs, reg.Register(m.maxTimestamp))
|
|
errs = append(errs, reg.Register(m.datasetColumnMetadataSize))
|
|
errs = append(errs, reg.Register(m.datasetColumnMetadataTotalSize))
|
|
errs = append(errs, reg.Register(m.datasetColumnCount))
|
|
errs = append(errs, reg.Register(m.datasetColumnCompressedBytes))
|
|
errs = append(errs, reg.Register(m.datasetColumnUncompressedBytes))
|
|
errs = append(errs, reg.Register(m.datasetColumnCompressionRatio))
|
|
errs = append(errs, reg.Register(m.datasetColumnRows))
|
|
errs = append(errs, reg.Register(m.datasetColumnValues))
|
|
errs = append(errs, reg.Register(m.datasetPageCount))
|
|
errs = append(errs, reg.Register(m.datasetPageCompressedBytes))
|
|
errs = append(errs, reg.Register(m.datasetPageUncompressedBytes))
|
|
errs = append(errs, reg.Register(m.datasetPageCompressionRatio))
|
|
errs = append(errs, reg.Register(m.datasetPageRows))
|
|
errs = append(errs, reg.Register(m.datasetPageValues))
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
// Unregister unregisters metrics from the provided Registerer.
|
|
func (m *Metrics) Unregister(reg prometheus.Registerer) {
|
|
reg.Unregister(m.encodeSeconds)
|
|
reg.Unregister(m.recordsTotal)
|
|
reg.Unregister(m.minTimestamp)
|
|
reg.Unregister(m.maxTimestamp)
|
|
reg.Unregister(m.datasetColumnMetadataSize)
|
|
reg.Unregister(m.datasetColumnMetadataTotalSize)
|
|
reg.Unregister(m.datasetColumnCount)
|
|
reg.Unregister(m.datasetColumnCompressedBytes)
|
|
reg.Unregister(m.datasetColumnUncompressedBytes)
|
|
reg.Unregister(m.datasetColumnCompressionRatio)
|
|
reg.Unregister(m.datasetColumnRows)
|
|
reg.Unregister(m.datasetColumnValues)
|
|
reg.Unregister(m.datasetPageCount)
|
|
reg.Unregister(m.datasetPageCompressedBytes)
|
|
reg.Unregister(m.datasetPageUncompressedBytes)
|
|
reg.Unregister(m.datasetPageCompressionRatio)
|
|
reg.Unregister(m.datasetPageRows)
|
|
reg.Unregister(m.datasetPageValues)
|
|
}
|
|
|
|
// Observe observes section statistics for a given section.
|
|
func (m *Metrics) Observe(ctx context.Context, section *Section) error {
|
|
dec := newDecoder(section.reader)
|
|
metadata, err := dec.Metadata(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
columnDescs := metadata.GetColumns()
|
|
m.datasetColumnCount.Observe(float64(len(columnDescs)))
|
|
|
|
columnPages, err := result.Collect(dec.Pages(ctx, columnDescs))
|
|
if err != nil {
|
|
return err
|
|
} else if len(columnPages) != len(columnDescs) {
|
|
return fmt.Errorf("expected %d page lists, got %d", len(columnDescs), len(columnPages))
|
|
}
|
|
|
|
// Count metadata sizes across columns.
|
|
{
|
|
var totalColumnMetadataSize int
|
|
for i := range columnDescs {
|
|
columnMetadataSize := proto.Size(&pointersmd.ColumnMetadata{Pages: columnPages[i]})
|
|
m.datasetColumnMetadataSize.Observe(float64(columnMetadataSize))
|
|
totalColumnMetadataSize += columnMetadataSize
|
|
}
|
|
m.datasetColumnMetadataTotalSize.Observe(float64(totalColumnMetadataSize))
|
|
}
|
|
|
|
for i, column := range columnDescs {
|
|
columnType := column.Type.String()
|
|
pages := columnPages[i]
|
|
compression := column.Info.Compression
|
|
|
|
m.datasetColumnCompressedBytes.WithLabelValues(columnType).Observe(float64(column.Info.CompressedSize))
|
|
m.datasetColumnUncompressedBytes.WithLabelValues(columnType).Observe(float64(column.Info.UncompressedSize))
|
|
if compression != datasetmd.COMPRESSION_TYPE_NONE {
|
|
m.datasetColumnCompressionRatio.WithLabelValues(columnType, compression.String()).Observe(float64(column.Info.UncompressedSize) / float64(column.Info.CompressedSize))
|
|
}
|
|
m.datasetColumnRows.WithLabelValues(columnType).Observe(float64(column.Info.RowsCount))
|
|
m.datasetColumnValues.WithLabelValues(columnType).Observe(float64(column.Info.ValuesCount))
|
|
|
|
m.datasetPageCount.WithLabelValues(columnType).Observe(float64(len(pages)))
|
|
|
|
for _, page := range pages {
|
|
m.datasetPageCompressedBytes.WithLabelValues(columnType).Observe(float64(page.Info.CompressedSize))
|
|
m.datasetPageUncompressedBytes.WithLabelValues(columnType).Observe(float64(page.Info.UncompressedSize))
|
|
if compression != datasetmd.COMPRESSION_TYPE_NONE {
|
|
m.datasetPageCompressionRatio.WithLabelValues(columnType, compression.String()).Observe(float64(page.Info.UncompressedSize) / float64(page.Info.CompressedSize))
|
|
}
|
|
m.datasetPageRows.WithLabelValues(columnType).Observe(float64(page.Info.RowsCount))
|
|
m.datasetPageValues.WithLabelValues(columnType).Observe(float64(page.Info.ValuesCount))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func newNativeHistogram(opts prometheus.HistogramOpts) prometheus.Histogram {
|
|
opts.NativeHistogramBucketFactor = 1.1
|
|
opts.NativeHistogramMaxBucketNumber = 100
|
|
opts.NativeHistogramMinResetDuration = time.Hour
|
|
|
|
return prometheus.NewHistogram(opts)
|
|
}
|
|
|
|
func newNativeHistogramVec(opts prometheus.HistogramOpts, labels []string) *prometheus.HistogramVec {
|
|
opts.NativeHistogramBucketFactor = 1.1
|
|
opts.NativeHistogramMaxBucketNumber = 100
|
|
opts.NativeHistogramMinResetDuration = time.Hour
|
|
|
|
return prometheus.NewHistogramVec(opts, labels)
|
|
}
|
|
|