Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/logproto/extensions.go

24 lines
1018 B

package logproto
import "github.com/prometheus/prometheus/model/labels"
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
6 years ago
// Note, this is not very efficient and use should be minimized as it requires label construction on each comparison
type SeriesIdentifiers []SeriesIdentifier
func (ids SeriesIdentifiers) Len() int { return len(ids) }
func (ids SeriesIdentifiers) Swap(i, j int) { ids[i], ids[j] = ids[j], ids[i] }
func (ids SeriesIdentifiers) Less(i, j int) bool {
a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels)
return labels.Compare(a, b) <= 0
}
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
6 years ago
type Streams []Stream
func (xs Streams) Len() int { return len(xs) }
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }
func (s Series) Len() int { return len(s.Samples) }
func (s Series) Swap(i, j int) { s.Samples[i], s.Samples[j] = s.Samples[j], s.Samples[i] }
func (s Series) Less(i, j int) bool { return s.Samples[i].Timestamp < s.Samples[j].Timestamp }