loki: Allow setting a 'step' when querying logs. (#1773)

* Setting a step on log queries will result in similar behavior as metrics, only returning logs at multiples of the step

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* clear up logic a little to make it easier to follow

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* try to protect future us from future us

Signed-off-by: Edward Welch <edward.welch@grafana.com>
pull/1777/head
Ed Welch 5 years ago committed by GitHub
parent a9f25a35d6
commit 934a0b8dc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      pkg/loghttp/params.go
  2. 37
      pkg/loghttp/params_test.go
  3. 34
      pkg/loghttp/query.go
  4. 2
      pkg/loghttp/query_test.go
  5. 32
      pkg/logql/engine.go
  6. 75
      pkg/logql/engine_test.go
  7. 2
      pkg/logql/evaluator.go

@ -57,10 +57,10 @@ func bounds(r *http.Request) (time.Time, time.Time, error) {
return start, end, nil
}
func step(r *http.Request, start, end time.Time) (time.Duration, error) {
func step(r *http.Request) (time.Duration, error) {
value := r.Form.Get("step")
if value == "" {
return time.Duration(defaultQueryRangeStep(start, end)) * time.Second, nil
return 0, nil
}
if d, err := strconv.ParseFloat(value, 64); err == nil {

@ -54,10 +54,10 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
reqPath string
expected *RangeQuery
}{
"should set the default step based on the input time range if the step parameter is not provided": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000",
"should set the default step based on the input time range if the step parameter is not provided on metric query": {
reqPath: "/loki/api/v1/query_range?query=rate({job=\"boring\"}[5m])&start=0&end=3600000000000",
expected: &RangeQuery{
Query: "{}",
Query: "rate({job=\"boring\"}[5m])",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 14 * time.Second,
@ -65,10 +65,21 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
Direction: logproto.BACKWARD,
},
},
"should leave the step at 0 if step is not provided on a log query": {
reqPath: "/loki/api/v1/query_range?query={job=\"boring\"}&start=0&end=3600000000000",
expected: &RangeQuery{
Query: "{job=\"boring\"}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 0,
Limit: 100,
Direction: logproto.BACKWARD,
},
},
"should use the input step parameter if provided as an integer": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5",
reqPath: "/loki/api/v1/query_range?query={job=\"sleeping\"}&start=0&end=3600000000000&step=5",
expected: &RangeQuery{
Query: "{}",
Query: "{job=\"sleeping\"}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 5 * time.Second,
@ -77,9 +88,9 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
},
},
"should use the input step parameter if provided as a float without decimals": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5.000",
reqPath: "/loki/api/v1/query_range?query={state=\"broken\"}&start=0&end=3600000000000&step=5.000",
expected: &RangeQuery{
Query: "{}",
Query: "{state=\"broken\"}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 5 * time.Second,
@ -88,9 +99,9 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
},
},
"should use the input step parameter if provided as a float with decimals": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5.500",
reqPath: "/loki/api/v1/query_range?query={stop=\"start\"}&start=0&end=3600000000000&step=5.500",
expected: &RangeQuery{
Query: "{}",
Query: "{stop=\"start\"}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 5.5 * 1e9,
@ -99,9 +110,9 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
},
},
"should use the input step parameter if provided as a duration in seconds": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5s",
reqPath: "/loki/api/v1/query_range?query={query=\"query\"}&start=0&end=3600000000000&step=5s",
expected: &RangeQuery{
Query: "{}",
Query: "{query=\"query\"}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 5 * time.Second,
@ -110,9 +121,9 @@ func TestHttp_ParseRangeQuery_Step(t *testing.T) {
},
},
"should use the input step parameter if provided as a duration in days": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5d",
reqPath: "/loki/api/v1/query_range?query={foo!=\"foo\"}&start=0&end=3600000000000&step=5d",
expected: &RangeQuery{
Query: "{}",
Query: "{foo!=\"foo\"}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 5 * 24 * 3600 * time.Second,

@ -12,12 +12,13 @@ import (
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)
var (
errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time")
errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
errNegativeStep = errors.New("negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
)
@ -269,19 +270,36 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
return nil, err
}
result.Step, err = step(r, result.Start, result.End)
result.Step, err = step(r)
if err != nil {
return nil, err
}
if result.Step <= 0 {
if result.Step < 0 {
return nil, errNegativeStep
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (result.End.Sub(result.Start) / result.Step) > 11000 {
return nil, errStepTooSmall
// Additional rules are required for the step on metric queries,
// so we need to parse the expression now to see what type it is.
expr, err := logql.ParseExpr(result.Query)
if err != nil {
return nil, err
}
switch expr.(type) {
case logql.LogSelectorExpr:
// This is a no-op we allow the step to be 0 for Log queries
case logql.SampleExpr:
// For metric queries, if the step is 0 apply a default calculation to it to make it non zero
if result.Step == 0 {
result.Step = time.Duration(defaultQueryRangeStep(result.Start, result.End)) * time.Second
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (result.End.Sub(result.Start) / result.Step) > 11000 {
return nil, errStepTooSmall
}
default:
panic("An unexpected query expression type was encountered when applying rules to the query step parameter, " +
"please update query.go with the appropriate rules for handling this new expression type.")
}
return &result, nil

@ -40,7 +40,7 @@ func TestParseRangeQuery(t *testing.T) {
}, nil, true},
{"too small step",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1`),
URL: mustParseURL(`?query=rate({foo="bar"}[5m])&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1`),
}, nil, true},
{"good",
&http.Request{

@ -199,7 +199,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
return nil, err
}
defer helpers.LogError("closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit)
streams, err := readStreams(iter, q.limit, q.step, q.direction)
return streams, err
}
@ -297,19 +297,33 @@ func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.M
return promql.Matrix{series}
}
func readStreams(i iter.EntryIterator, size uint32) (Streams, error) {
func readStreams(i iter.EntryIterator, size uint32, step time.Duration, dir logproto.Direction) (Streams, error) {
streams := map[string]*logproto.Stream{}
respSize := uint32(0)
for ; respSize < size && i.Next(); respSize++ {
// lastEntry should be a really old time so that the first comparison is always true, we use a negative
// value here because many unit tests start at time.Unix(0,0)
lastEntry := time.Unix(-100, 0)
for respSize < size && i.Next() {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
forwardShouldOutput := dir == logproto.FORWARD &&
(i.Entry().Timestamp.Equal(lastEntry.Add(step)) || i.Entry().Timestamp.After(lastEntry.Add(step)))
backwardShouldOutput := dir == logproto.BACKWARD &&
(i.Entry().Timestamp.Equal(lastEntry.Add(-step)) || i.Entry().Timestamp.Before(lastEntry.Add(-step)))
// If step == 0 output every line.
// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
// Then check to see if the entry is equal to, or past a forward or reverse step
if step == 0 || lastEntry.Unix() < 0 || forwardShouldOutput || backwardShouldOutput {
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
streams[labels] = stream
stream.Entries = append(stream.Entries, entry)
lastEntry = i.Entry().Timestamp
respSize++
}
stream.Entries = append(stream.Entries, entry)
}
result := make([]*logproto.Stream, 0, len(streams))

@ -363,15 +363,45 @@ func TestEngine_NewRangeQuery(t *testing.T) {
Streams([]*logproto.Stream{newStream(10, identity, `{app="foo"}`)}),
},
{
`{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(0, 0), time.Unix(30, 0), time.Second, logproto.BACKWARD, 30,
`{app="food"}`, time.Unix(0, 0), time.Unix(30, 0), 2 * time.Second, logproto.FORWARD, 10,
[][]*logproto.Stream{
{newStream(testSize, identity, `{app="food"}`)},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="food"}`}},
},
Streams([]*logproto.Stream{newStepStream(10, 2*time.Second, identity, `{app="food"}`)}),
},
{
`{app="fed"}`, time.Unix(0, 0), time.Unix(30, 0), 2 * time.Second, logproto.BACKWARD, 10,
[][]*logproto.Stream{
{newBackwardStream(testSize, identity, `{app="fed"}`)},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="fed"}`}},
},
Streams([]*logproto.Stream{newBackwardStepStream(testSize, 10, 2*time.Second, identity, `{app="fed"}`)}),
},
{
`{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(0, 0), time.Unix(30, 0), time.Second, logproto.FORWARD, 30,
[][]*logproto.Stream{
{newStream(testSize, identity, `{app="bar"}`)},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="bar"}|="foo"|~".+bar"`}},
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="bar"}|="foo"|~".+bar"`}},
},
Streams([]*logproto.Stream{newStream(30, identity, `{app="bar"}`)}),
},
{
`{app="barf"} |= "foo" |~ ".+bar"`, time.Unix(0, 0), time.Unix(30, 0), 3 * time.Second, logproto.BACKWARD, 30,
[][]*logproto.Stream{
{newBackwardStream(testSize, identity, `{app="barf"}`)},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="barf"}|="foo"|~".+bar"`}},
},
Streams([]*logproto.Stream{newBackwardStepStream(testSize, 30, 3*time.Second, identity, `{app="barf"}`)}),
},
{
`rate({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), time.Unix(120, 0), time.Minute, logproto.BACKWARD, 10,
[][]*logproto.Stream{
@ -1230,6 +1260,47 @@ func newStream(n int64, f generator, labels string) *logproto.Stream {
}
}
func newStepStream(n int64, step time.Duration, f generator, labels string) *logproto.Stream {
entries := []logproto.Entry{}
lastEntry := int64(-100) // Start with a really small value (negative) so we always output the first item
for i := int64(0); int64(len(entries)) < n; i++ {
if float64(lastEntry)+step.Seconds() <= float64(i) {
entries = append(entries, f(i))
lastEntry = i
}
}
return &logproto.Stream{
Entries: entries,
Labels: labels,
}
}
func newBackwardStream(n int64, f generator, labels string) *logproto.Stream {
entries := []logproto.Entry{}
for i := n - 1; i > 0; i-- {
entries = append(entries, f(i))
}
return &logproto.Stream{
Entries: entries,
Labels: labels,
}
}
func newBackwardStepStream(n, expectedResults int64, step time.Duration, f generator, labels string) *logproto.Stream {
entries := []logproto.Entry{}
lastEntry := int64(100000) //Start with some really big value so that we always output the first item
for i := n - 1; int64(len(entries)) < expectedResults; i-- {
if float64(lastEntry)-step.Seconds() >= float64(i) {
entries = append(entries, f(i))
lastEntry = i
}
}
return &logproto.Stream{
Entries: entries,
Labels: labels,
}
}
func identity(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i, 0),

@ -61,7 +61,7 @@ func (p LiteralParams) Direction() logproto.Direction { return p.direction }
// GetRangeType returns whether a query is an instant query or range query
func GetRangeType(q Params) QueryRangeType {
if q.Start() == q.End() && q.Step() == 0 {
if q.Start() == q.End() {
return InstantType
}
return RangeType

Loading…
Cancel
Save