|
|
|
|
@ -1175,7 +1175,7 @@ func (ev *evaluator) Eval(ctx context.Context, expr parser.Expr) (v parser.Value |
|
|
|
|
|
|
|
|
|
v, ws = ev.eval(ctx, expr) |
|
|
|
|
if ev.enableDelayedNameRemoval { |
|
|
|
|
ev.cleanupMetricLabels(v) |
|
|
|
|
v = ev.cleanupMetricLabels(v) |
|
|
|
|
} |
|
|
|
|
return v, ws, nil |
|
|
|
|
} |
|
|
|
|
@ -3832,7 +3832,7 @@ func (*evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []str |
|
|
|
|
return enh.Out, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (ev *evaluator) cleanupMetricLabels(v parser.Value) { |
|
|
|
|
func (ev *evaluator) cleanupMetricLabels(v parser.Value) parser.Value { |
|
|
|
|
if v.Type() == parser.ValueTypeMatrix { |
|
|
|
|
mat := v.(Matrix) |
|
|
|
|
for i := range mat { |
|
|
|
|
@ -3840,9 +3840,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) { |
|
|
|
|
mat[i].Metric = mat[i].Metric.DropReserved(schema.IsMetadataLabel) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if mat.ContainsSameLabelset() { |
|
|
|
|
ev.errorf("vector cannot contain metrics with the same labelset") |
|
|
|
|
} |
|
|
|
|
return ev.mergeSeriesWithSameLabelset(mat) |
|
|
|
|
} else if v.Type() == parser.ValueTypeVector { |
|
|
|
|
vec := v.(Vector) |
|
|
|
|
for i := range vec { |
|
|
|
|
@ -3853,7 +3851,75 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) { |
|
|
|
|
if vec.ContainsSameLabelset() { |
|
|
|
|
ev.errorf("vector cannot contain metrics with the same labelset") |
|
|
|
|
} |
|
|
|
|
return vec |
|
|
|
|
} |
|
|
|
|
return v |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// mergeSeriesWithSameLabelset merges series in a matrix that have the same labelset
|
|
|
|
|
// after __name__ label removal. This happens when delayed name removal is enabled and
|
|
|
|
|
// operations like OR combine series that originally had different names but end up
|
|
|
|
|
// with the same labelset after dropping the name. If series with the same labelset
|
|
|
|
|
// have overlapping timestamps, an error is returned.
|
|
|
|
|
func (ev *evaluator) mergeSeriesWithSameLabelset(mat Matrix) Matrix { |
|
|
|
|
if len(mat) <= 1 { |
|
|
|
|
return mat |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Fast path: check if there are any duplicate labelsets without allocating.
|
|
|
|
|
// This is the common case and we want to avoid allocations.
|
|
|
|
|
if !mat.ContainsSameLabelset() { |
|
|
|
|
return mat |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Slow path: there are duplicates, so we need to merge series with non-overlapping timestamps.
|
|
|
|
|
// Group series by their labelset hash.
|
|
|
|
|
seriesByHash := make(map[uint64][]int) |
|
|
|
|
for i := range mat { |
|
|
|
|
hash := mat[i].Metric.Hash() |
|
|
|
|
seriesByHash[hash] = append(seriesByHash[hash], i) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Merge series with the same labelset.
|
|
|
|
|
merged := make(Matrix, 0, len(seriesByHash)) |
|
|
|
|
for _, indices := range seriesByHash { |
|
|
|
|
if len(indices) == 1 { |
|
|
|
|
// No collision, add as-is.
|
|
|
|
|
merged = append(merged, mat[indices[0]]) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Multiple series with the same labelset - merge all samples.
|
|
|
|
|
base := mat[indices[0]] |
|
|
|
|
for _, idx := range indices[1:] { |
|
|
|
|
base.Floats = append(base.Floats, mat[idx].Floats...) |
|
|
|
|
base.Histograms = append(base.Histograms, mat[idx].Histograms...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Sort merged samples by timestamp.
|
|
|
|
|
sort.Slice(base.Floats, func(i, j int) bool { |
|
|
|
|
return base.Floats[i].T < base.Floats[j].T |
|
|
|
|
}) |
|
|
|
|
sort.Slice(base.Histograms, func(i, j int) bool { |
|
|
|
|
return base.Histograms[i].T < base.Histograms[j].T |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
// Check for duplicate timestamps in sorted samples.
|
|
|
|
|
for i := 1; i < len(base.Floats); i++ { |
|
|
|
|
if base.Floats[i].T == base.Floats[i-1].T { |
|
|
|
|
ev.errorf("vector cannot contain metrics with the same labelset") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for i := 1; i < len(base.Histograms); i++ { |
|
|
|
|
if base.Histograms[i].T == base.Histograms[i-1].T { |
|
|
|
|
ev.errorf("vector cannot contain metrics with the same labelset") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
merged = append(merged, base) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return merged |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, numSteps int) { |
|
|
|
|
|