Fixes a bug in MatrixStepper when sharding queries. (#3550)

* Fixes a bug in MatrixStepper when sharding queries.

Since we split correctly metric queries, this bug has show itself.
Basically we were not correctly stepping through time. We should always start from start, add the step until the start is after the end.

For more read: https://www.robustperception.io/step-and-query_range

Fixes #3541

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes tests. for real.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Not easy to get those test working.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/3553/head
Cyril Tovena 5 years ago committed by GitHub
parent 37a7189d4e
commit b11d2effe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/logcli/query/query.go
  2. 2
      pkg/logql/matrix.go
  3. 48
      pkg/logql/matrix_test.go
  4. 6
      pkg/logql/test_utils.go

@ -58,7 +58,6 @@ type Query struct {
// DoQuery executes the query and prints out the results
func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) {
if q.LocalConfig != "" {
if err := q.DoLocalQuery(out, statistics, c.GetOrgID()); err != nil {
log.Fatalf("Query failed: %+v", err)
@ -149,7 +148,6 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
}
}
}
func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
@ -172,7 +170,6 @@ func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, las
// DoLocalQuery executes the query against the local store using a Loki configuration file.
func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string) error {
var conf loki.Config
conf.RegisterFlags(flag.CommandLine)
if q.LocalConfig == "" {
@ -255,7 +252,7 @@ func (q *Query) SetInstant(time time.Time) {
}
func (q *Query) isInstant() bool {
return q.Start == q.End
return q.Start == q.End && q.Step == 0
}
func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
@ -369,7 +366,6 @@ func (q *Query) printMatrix(matrix loghttp.Matrix) {
// it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually
// we might add output options such as render to an image file on disk
bytes, err := json.MarshalIndent(matrix, "", " ")
if err != nil {
log.Fatalf("Error marshalling matrix: %v", err)
}
@ -379,7 +375,6 @@ func (q *Query) printMatrix(matrix loghttp.Matrix) {
func (q *Query) printVector(vector loghttp.Vector) {
bytes, err := json.MarshalIndent(vector, "", " ")
if err != nil {
log.Fatalf("Error marshalling vector: %v", err)
}
@ -389,7 +384,6 @@ func (q *Query) printVector(vector loghttp.Vector) {
func (q *Query) printScalar(scalar loghttp.Scalar) {
bytes, err := json.MarshalIndent(scalar, "", " ")
if err != nil {
log.Fatalf("Error marshalling scalar: %v", err)
}

@ -33,7 +33,7 @@ func NewMatrixStepper(start, end time.Time, step time.Duration, m promql.Matrix)
func (m *MatrixStepper) Next() (bool, int64, promql.Vector) {
m.ts = m.ts.Add(m.step)
if !m.ts.Before(m.end) {
if m.ts.After(m.end) {
return false, 0, nil
}

@ -20,7 +20,7 @@ func TestMatrixStepper(t *testing.T) {
promql.Series{
Metric: labels.Labels{{Name: "foo", Value: "bar"}},
Points: []promql.Point{
{T: start.UnixNano() / int64(step), V: 0},
{T: start.UnixNano(), V: 0},
{T: start.Add(step).UnixNano() / int64(time.Millisecond), V: 1},
{T: start.Add(2*step).UnixNano() / int64(time.Millisecond), V: 2},
{T: start.Add(3*step).UnixNano() / int64(time.Millisecond), V: 3},
@ -42,11 +42,11 @@ func TestMatrixStepper(t *testing.T) {
expected := []promql.Vector{
{
promql.Sample{
Point: promql.Point{T: start.UnixNano() / int64(step), V: 0},
Point: promql.Point{T: start.UnixNano(), V: 0},
Metric: labels.Labels{{Name: "foo", Value: "bar"}},
},
promql.Sample{
Point: promql.Point{T: start.UnixNano() / int64(step), V: 0},
Point: promql.Point{T: start.UnixNano(), V: 0},
Metric: labels.Labels{{Name: "bazz", Value: "buzz"}},
},
},
@ -100,9 +100,19 @@ func TestMatrixStepper(t *testing.T) {
Metric: labels.Labels{{Name: "bazz", Value: "buzz"}},
},
},
{
promql.Sample{
Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 0},
Metric: labels.Labels{{Name: "foo", Value: "bar"}},
},
promql.Sample{
Point: promql.Point{T: start.Add(6*step).UnixNano() / int64(time.Millisecond), V: 0},
Metric: labels.Labels{{Name: "bazz", Value: "buzz"}},
},
},
}
for i := 0; i < int(end.Sub(start)/step); i++ {
for i := 0; i <= int(end.Sub(start)/step); i++ {
ok, ts, vec := s.Next()
require.Equal(t, ok, true)
require.Equal(t, start.Add(step*time.Duration(i)).UnixNano()/int64(time.Millisecond), ts)
@ -113,3 +123,33 @@ func TestMatrixStepper(t *testing.T) {
require.Equal(t, ok, false)
}
func Test_SingleStepMatrix(t *testing.T) {
var (
start = time.Unix(0, 0)
end = time.Unix(0, 0)
step = time.Second
)
m := promql.Matrix{
promql.Series{
Metric: labels.Labels{},
Points: []promql.Point{
{T: start.UnixNano(), V: 10},
},
},
}
s := NewMatrixStepper(start, end, step, m)
ok, ts, vec := s.Next()
require.True(t, ok)
require.Equal(t, start.UnixNano(), ts)
require.Equal(t, promql.Vector{promql.Sample{
Point: promql.Point{T: start.UnixNano(), V: 10},
Metric: labels.Labels{},
}}, vec)
ok, _, _ = s.Next()
require.False(t, ok)
}

@ -201,7 +201,7 @@ outer:
return iter.NewTimeRangedSampleIterator(
iter.NewMultiSeriesIterator(ctx, filtered),
req.Start.UnixNano(),
req.End.UnixNano(),
req.End.UnixNano()+1,
), nil
}
@ -232,7 +232,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
results = append(results, res)
}
return results, nil
}
// create nStreams of nEntries with labelNames each where each label value
@ -256,7 +255,7 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream
Value: fmt.Sprintf("%d", shard),
})
}
for j := 0; j < nEntries; j++ {
for j := 0; j <= nEntries; j++ {
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, int64(j*int(time.Second))),
Line: fmt.Sprintf("line number: %d", j),
@ -267,7 +266,6 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream
streams = append(streams, stream)
}
return streams
}
func mustParseLabels(s string) labels.Labels {

Loading…
Cancel
Save