mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
402 lines
11 KiB
402 lines
11 KiB
package pattern
|
|
|
|
import (
|
|
"context"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
|
|
"github.com/grafana/loki/v3/pkg/loghttp/push"
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/pattern/aggregation"
|
|
"github.com/grafana/loki/v3/pkg/pattern/drain"
|
|
"github.com/grafana/loki/v3/pkg/pattern/iter"
|
|
"github.com/grafana/loki/v3/pkg/util/constants"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
)
|
|
|
|
type stream struct {
|
|
fp model.Fingerprint
|
|
labels labels.Labels
|
|
labelsString string
|
|
labelHash uint64
|
|
patterns map[string]*drain.Drain
|
|
mtx sync.Mutex
|
|
logger log.Logger
|
|
patternWriter aggregation.EntryWriter
|
|
aggregationMetrics *aggregation.Metrics
|
|
instanceID string
|
|
|
|
lastTS int64
|
|
persistenceGranularity time.Duration
|
|
sampleInterval time.Duration
|
|
patternRateThreshold float64
|
|
volumeThreshold float64
|
|
}
|
|
|
|
func newStream(
|
|
fp model.Fingerprint,
|
|
ls labels.Labels,
|
|
metrics *ingesterMetrics,
|
|
logger log.Logger,
|
|
guessedFormat string,
|
|
instanceID string,
|
|
drainCfg *drain.Config,
|
|
limits Limits,
|
|
patternWriter aggregation.EntryWriter,
|
|
aggregationMetrics *aggregation.Metrics,
|
|
volumeThreshold float64,
|
|
) (*stream, error) {
|
|
linesSkipped, err := metrics.linesSkipped.CurryWith(prometheus.Labels{"tenant": instanceID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
patterns := make(map[string]*drain.Drain, len(constants.LogLevels))
|
|
for _, lvl := range constants.LogLevels {
|
|
patterns[lvl] = drain.New(instanceID, drainCfg, limits, guessedFormat, &drain.Metrics{
|
|
PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"),
|
|
PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"),
|
|
PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat),
|
|
LinesSkipped: linesSkipped,
|
|
TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat),
|
|
StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat),
|
|
})
|
|
}
|
|
|
|
// Get per-tenant persistence granularity (requires casting drainLimits to Limits interface)
|
|
persistenceGranularity := limits.PersistenceGranularity(instanceID)
|
|
if persistenceGranularity == 0 {
|
|
persistenceGranularity = drainCfg.MaxChunkAge
|
|
}
|
|
|
|
return &stream{
|
|
fp: fp,
|
|
labels: ls,
|
|
labelsString: ls.String(),
|
|
labelHash: labels.StableHash(ls),
|
|
logger: logger,
|
|
patterns: patterns,
|
|
patternWriter: patternWriter,
|
|
aggregationMetrics: aggregationMetrics,
|
|
instanceID: instanceID,
|
|
persistenceGranularity: persistenceGranularity,
|
|
sampleInterval: drainCfg.SampleInterval,
|
|
patternRateThreshold: limits.PatternRateThreshold(instanceID),
|
|
volumeThreshold: volumeThreshold,
|
|
}, nil
|
|
}
|
|
|
|
func (s *stream) Push(
|
|
_ context.Context,
|
|
entries []logproto.Entry,
|
|
) error {
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
|
|
for _, entry := range entries {
|
|
if entry.Timestamp.UnixNano() < s.lastTS {
|
|
continue
|
|
}
|
|
|
|
metadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
|
|
lvl := constants.LogLevelUnknown
|
|
if metadata.Has(constants.LevelLabel) {
|
|
lvl = strings.ToLower(metadata.Get(constants.LevelLabel))
|
|
}
|
|
s.lastTS = entry.Timestamp.UnixNano()
|
|
|
|
//TODO(twhitney): Can we reduce lock contention by locking by level rather than for the entire stream?
|
|
if pattern, ok := s.patterns[lvl]; ok {
|
|
pattern.Train(entry.Line, entry.Timestamp.UnixNano())
|
|
} else {
|
|
// since we're defaulting the level to unknown above, we should never get here.
|
|
s.patterns[constants.LogLevelUnknown].Train(entry.Line, entry.Timestamp.UnixNano())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO(twhitney): Allow a level to be specified for the iterator. Requires a change to the query API.
|
|
func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (iter.Iterator, error) {
|
|
// todo we should improve locking.
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
|
|
iters := []iter.Iterator{}
|
|
for lvl, pattern := range s.patterns {
|
|
clusters := pattern.Clusters()
|
|
for _, cluster := range clusters {
|
|
if cluster.String() == "" {
|
|
continue
|
|
}
|
|
iters = append(iters, cluster.Iterator(lvl, from, through, step, model.Time(s.sampleInterval.Milliseconds())))
|
|
}
|
|
}
|
|
|
|
return iter.NewMerge(iters...), nil
|
|
}
|
|
|
|
// Collect all clusters with their metadata for filtering
|
|
type clusterWithMeta struct {
|
|
cluster *drain.LogCluster
|
|
level string
|
|
drainInstance *drain.Drain
|
|
prunedSamples []*logproto.PatternSample
|
|
}
|
|
|
|
func (s *stream) prune(olderThan time.Duration) bool {
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
|
|
var allClusters []clusterWithMeta
|
|
|
|
// First pass: collect all clusters and prune samples
|
|
totalClusters := 0
|
|
for lvl, pattern := range s.patterns {
|
|
clusters := pattern.Clusters()
|
|
for _, cluster := range clusters {
|
|
prunedSamples := cluster.Prune(olderThan)
|
|
if len(prunedSamples) > 0 {
|
|
allClusters = append(allClusters, clusterWithMeta{
|
|
cluster: cluster,
|
|
level: lvl,
|
|
drainInstance: pattern,
|
|
prunedSamples: prunedSamples,
|
|
})
|
|
}
|
|
if cluster.Size == 0 {
|
|
pattern.Delete(cluster)
|
|
}
|
|
// Clear empty branches and track total clusters
|
|
pattern.Prune()
|
|
totalClusters += len(pattern.Clusters())
|
|
}
|
|
}
|
|
|
|
// Filter clusters by volume if volumeThreshold is set (< 1.0)
|
|
var clustersToWrite []clusterWithMeta
|
|
if s.volumeThreshold > 0 && s.volumeThreshold < 1.0 && len(allClusters) > 0 {
|
|
// Sort clusters by volume, and keep only the top threshold of clusters by volume
|
|
// To optimize memory, filterClustersByVolume will mutate the input slice, the slice we get
|
|
// in rerturn uses the same underlying array as the input slice.
|
|
clustersToWrite = filterClustersByVolume(allClusters, s.volumeThreshold)
|
|
} else {
|
|
// No filtering, write all clusters
|
|
clustersToWrite = allClusters
|
|
}
|
|
|
|
// Write patterns for filtered clusters
|
|
for _, cm := range clustersToWrite {
|
|
s.writePatternsBucketed(cm.prunedSamples, s.labels, cm.cluster.String(), cm.level)
|
|
}
|
|
|
|
// Update active patterns gauge
|
|
s.updatePatternsActiveGauge()
|
|
|
|
return totalClusters == 0
|
|
}
|
|
|
|
// updatePatternsActiveGauge updates the active patterns gauge with the current cluster count
|
|
func (s *stream) updatePatternsActiveGauge() {
|
|
if s.aggregationMetrics == nil {
|
|
return
|
|
}
|
|
|
|
// Count total clusters across all levels
|
|
totalClusters := 0
|
|
for _, pattern := range s.patterns {
|
|
totalClusters += len(pattern.Clusters())
|
|
}
|
|
|
|
s.aggregationMetrics.PatternsActive.WithLabelValues(s.instanceID).Set(float64(totalClusters))
|
|
}
|
|
|
|
func (s *stream) flush() {
|
|
// Flush all patterns by pruning everything older than 0 (i.e., everything)
|
|
s.prune(0)
|
|
}
|
|
|
|
func (s *stream) writePattern(
|
|
ts model.Time,
|
|
streamLbls labels.Labels,
|
|
pattern string,
|
|
count int64,
|
|
lvl string,
|
|
) {
|
|
service := streamLbls.Get(push.LabelServiceName)
|
|
if service == "" {
|
|
service = push.ServiceUnknown
|
|
}
|
|
|
|
newLbls := labels.FromStrings(constants.PatternLabel, service)
|
|
|
|
newStructuredMetadata := []logproto.LabelAdapter{
|
|
{Name: constants.LevelLabel, Value: lvl},
|
|
}
|
|
|
|
if s.patternWriter != nil {
|
|
patternEntry := aggregation.PatternEntry(ts.Time(), count, pattern, streamLbls)
|
|
|
|
// Record metrics
|
|
if s.aggregationMetrics != nil {
|
|
// Increment pattern writes counter
|
|
s.aggregationMetrics.PatternWritesTotal.WithLabelValues(s.instanceID).Inc()
|
|
|
|
// Record pattern entry size
|
|
entrySize := len(patternEntry)
|
|
s.aggregationMetrics.PatternBytesWrittenTotal.WithLabelValues(s.instanceID).Add(float64(entrySize))
|
|
}
|
|
|
|
s.patternWriter.WriteEntry(
|
|
ts.Time(),
|
|
patternEntry,
|
|
newLbls,
|
|
newStructuredMetadata,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *stream) writePatternsBucketed(
|
|
prunedSamples []*logproto.PatternSample,
|
|
streamLbls labels.Labels,
|
|
pattern string,
|
|
lvl string,
|
|
) {
|
|
if len(prunedSamples) == 0 {
|
|
return
|
|
}
|
|
|
|
// Calculate bucket size
|
|
bucketSize := s.persistenceGranularity
|
|
|
|
// Process samples into buckets
|
|
buckets := make(map[model.Time][]*logproto.PatternSample)
|
|
|
|
for _, sample := range prunedSamples {
|
|
// Calculate which bucket this sample belongs to
|
|
sampleBucket := model.Time(sample.Timestamp.UnixNano() / bucketSize.Nanoseconds() * bucketSize.Nanoseconds() / 1e6)
|
|
buckets[sampleBucket] = append(buckets[sampleBucket], sample)
|
|
}
|
|
|
|
// Write pattern entries for each bucket (apply rate threshold per bucket)
|
|
for bucketTime, bucketSamples := range buckets {
|
|
if len(bucketSamples) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Check if pattern rate meets threshold,
|
|
// threshold of 0 means no rate threshold
|
|
if s.patternRateThreshold > 0 {
|
|
rate := s.calculatePatternRate(bucketSamples)
|
|
if rate < s.patternRateThreshold {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Calculate total value for this bucket
|
|
var totalValue int64
|
|
for _, sample := range bucketSamples {
|
|
totalValue += sample.Value
|
|
}
|
|
|
|
if totalValue > 0 {
|
|
s.writePattern(bucketTime, streamLbls, pattern, totalValue, lvl)
|
|
}
|
|
}
|
|
}
|
|
|
|
// calculatePatternRate calculates a per second rate of samples in a bucket.
|
|
func (s *stream) calculatePatternRate(samples []*logproto.PatternSample) float64 {
|
|
if len(samples) == 0 {
|
|
return 0.0
|
|
}
|
|
|
|
if len(samples) == 1 {
|
|
return 0.0
|
|
}
|
|
|
|
// Calculate total count and time span
|
|
var totalCount int64
|
|
var minTime, maxTime model.Time
|
|
|
|
for i, sample := range samples {
|
|
totalCount += sample.Value
|
|
if i == 0 {
|
|
minTime = sample.Timestamp
|
|
maxTime = sample.Timestamp
|
|
} else {
|
|
if sample.Timestamp < minTime {
|
|
minTime = sample.Timestamp
|
|
}
|
|
if sample.Timestamp > maxTime {
|
|
maxTime = sample.Timestamp
|
|
}
|
|
}
|
|
}
|
|
|
|
// Calculate time span in seconds
|
|
timeSpanSeconds := float64(maxTime.Sub(minTime)) / float64(time.Second)
|
|
|
|
if timeSpanSeconds == 0 {
|
|
return 0.0
|
|
}
|
|
|
|
// Return samples per second
|
|
return float64(totalCount) / timeSpanSeconds
|
|
}
|
|
|
|
// filterClustersByVolume sorts clusters in-place by volume and returns the number of clusters
|
|
// to keep to represent the top X% of total volume. This mutates the input slice, and returns
|
|
// a filtered slice that utilizes the same underlying array as the input slice.
|
|
func filterClustersByVolume(clusters []clusterWithMeta, threshold float64) []clusterWithMeta {
|
|
if len(clusters) == 0 {
|
|
return []clusterWithMeta{}
|
|
}
|
|
|
|
// Handle threshold of 0 - keep no clusters
|
|
if threshold == 0 {
|
|
return []clusterWithMeta{}
|
|
}
|
|
|
|
var totalVolume int64
|
|
for _, cluster := range clusters {
|
|
totalVolume += cluster.cluster.Volume
|
|
}
|
|
|
|
// Sort clusters by volume in descending order (in-place)
|
|
slices.SortFunc(clusters, func(i, j clusterWithMeta) int {
|
|
if i.cluster.Volume > j.cluster.Volume {
|
|
return -1 // Higher volume first
|
|
}
|
|
if i.cluster.Volume < j.cluster.Volume {
|
|
return 1 // Lower volume last
|
|
}
|
|
return 0
|
|
})
|
|
|
|
if totalVolume == 0 {
|
|
return []clusterWithMeta{}
|
|
}
|
|
|
|
// Find how many clusters to keep for the threshold
|
|
targetVolume := int64(float64(totalVolume) * threshold)
|
|
var cumulativeVolume int64
|
|
|
|
var i int
|
|
for ; i < len(clusters); i++ {
|
|
cumulativeVolume += clusters[i].cluster.Volume
|
|
if cumulativeVolume >= targetVolume {
|
|
i++ // Include this cluster that pushed us over the threshold
|
|
break
|
|
}
|
|
}
|
|
|
|
return clusters[0:i]
|
|
}
|
|
|