diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dbf1c41f2..662b36f71b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [4949](https://github.com/grafana/loki/pull/4949) **ssncferreira**: Add query `queueTime` metric to statistics and metrics.go * [4938](https://github.com/grafana/loki/pull/4938) **DylanGuedes**: Implement ring status page for the distributor * [5023](https://github.com/grafana/loki/pull/5023) **ssncferreira**: Move `querier.split-queries-by-interval` to a per-tenant configuration * [4993](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix parent of wal and wal_cleaner in loki ruler config docs diff --git a/docs/sources/api/_index.md b/docs/sources/api/_index.md index 6ff3802d3f..62a302c988 100644 --- a/docs/sources/api/_index.md +++ b/docs/sources/api/_index.md @@ -939,6 +939,7 @@ The example belows show all possible statistics returned with their respective d }, "summary": { "bytesProcessedPerSecond": 0, // Total of bytes processed per second + "queueTime": 0, // Total queue time in nanoseconds (int) "execTime": 0, // Total execution time in seconds (float) "linesProcessedPerSecond": 0, // Total lines processed per second "totalBytesProcessed":0, // Total amount of bytes processed overall for this request diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index bc99644c14..af4cf24790 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -567,7 +567,7 @@ func TestChunkStats(t *testing.T) { t.Fatal(err) } // test on a chunk filling up - s := statsCtx.Result(time.Since(first)) + s := statsCtx.Result(time.Since(first), 0) require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed) require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed) @@ -594,7 +594,7 @@ func TestChunkStats(t *testing.T) { if err := it.Close(); err != nil { t.Fatal(err) } - s = statsCtx.Result(time.Since(first)) + s = statsCtx.Result(time.Since(first), 0) require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed) require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed) diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index c381453549..459c39d6b3 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -550,7 +550,7 @@ func Test_DuplicateCount(t *testing.T) { defer it.Close() for it.Next() { } - require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0).TotalDuplicates()) + require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0, 0).TotalDuplicates()) }) } } diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index c77e9c8729..6b411efbfb 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/httpreq" ) var ( @@ -120,7 +121,9 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { data, err := q.Eval(ctx) - statResult := statsCtx.Result(time.Since(start)) + queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) + + statResult := statsCtx.Result(time.Since(start), queueTime) statResult.Log(level.Debug(log)) status := "200" diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 027acbb116..92d8e32acf 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/httpreq" ) var ( @@ -2062,6 +2063,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it func TestEngine_Stats(t *testing.T) { eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits) + queueTime := 2 * time.Nanosecond q := eng.Query(LiteralParams{ qs: `{foo="bar"}`, start: time.Now(), @@ -2069,9 +2071,11 @@ func TestEngine_Stats(t *testing.T) { direction: logproto.BACKWARD, limit: 1000, }) - r, err := q.Exec(user.InjectOrgID(context.Background(), "fake")) + ctx := context.WithValue(context.Background(), httpreq.QueryQueueTimeHTTPHeader, queueTime) + r, err := q.Exec(user.InjectOrgID(ctx, "fake")) require.NoError(t, err) require.Equal(t, int64(1), r.Statistics.TotalDecompressedBytes()) + require.Equal(t, queueTime.Nanoseconds(), r.Statistics.Summary.QueueTime) } type errorIteratorQuerier struct { diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 706784cd6c..8b59440577 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -106,6 +106,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res "returned_lines", returnedLines, "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), + "queue_time", time.Duration(stats.Summary.QueueTime), }...) logValues = append(logValues, tagsToKeyValues(queryTags)...) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 4550ae22ce..0f94eee234 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -76,13 +76,14 @@ func TestLogSlowQuery(t *testing.T) { }, "200", stats.Result{ Summary: stats.Summary{ BytesProcessedPerSecond: 100000, + QueueTime: 2, ExecTime: 25.25, TotalBytesProcessed: 100000, }, }, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}}) require.Equal(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB source=logvolhist feature=beta\n", + "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB queue_time=2ns source=logvolhist feature=beta\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 024de026fb..4f9b944769 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -91,7 +91,7 @@ func (c *Context) Reset() { } // Result calculates the summary based on store and ingester data. -func (c *Context) Result(execTime time.Duration) Result { +func (c *Context) Result(execTime time.Duration, queueTime time.Duration) Result { r := c.result r.Merge(Result{ @@ -101,7 +101,7 @@ func (c *Context) Result(execTime time.Duration) Result { Ingester: c.ingester, }) - r.ComputeSummary(execTime) + r.ComputeSummary(execTime, queueTime) return r } @@ -125,7 +125,7 @@ func JoinIngesters(ctx context.Context, inc Ingester) { } // ComputeSummary compute the summary of the statistics. -func (r *Result) ComputeSummary(execTime time.Duration) { +func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration) { r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes + r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines + @@ -139,6 +139,9 @@ func (r *Result) ComputeSummary(execTime time.Duration) { int64(float64(r.Summary.TotalLinesProcessed) / execTime.Seconds()) } + if queueTime != 0 { + r.Summary.QueueTime = int64(queueTime) + } } func (s *Store) Merge(m Store) { @@ -168,7 +171,8 @@ func (i *Ingester) Merge(m Ingester) { func (r *Result) Merge(m Result) { r.Querier.Merge(m.Querier) r.Ingester.Merge(m.Ingester) - r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime + m.Summary.ExecTime) * float64(time.Second)))) + r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime+m.Summary.ExecTime)*float64(time.Second))), + time.Duration(r.Summary.QueueTime+m.Summary.QueueTime)) } func (r Result) ChunksDownloadTime() time.Duration { @@ -281,5 +285,6 @@ func (s Summary) Log(log log.Logger) { "Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)), "Summary.TotalLinesProcessed", s.TotalLinesProcessed, "Summary.ExecTime", time.Duration(int64(s.ExecTime*float64(time.Second))), + "Summary.QueueTime", time.Duration(s.QueueTime), ) } diff --git a/pkg/logqlmodel/stats/context_test.go b/pkg/logqlmodel/stats/context_test.go index 47552b4e90..bb158e21f1 100644 --- a/pkg/logqlmodel/stats/context_test.go +++ b/pkg/logqlmodel/stats/context_test.go @@ -25,7 +25,7 @@ func TestResult(t *testing.T) { fakeIngesterQuery(ctx) fakeIngesterQuery(ctx) - res := stats.Result(2 * time.Second) + res := stats.Result(2*time.Second, 2*time.Nanosecond) res.Log(util_log.Logger) expected := Result{ Ingester: Ingester{ @@ -61,6 +61,7 @@ func TestResult(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * time.Second.Seconds(), + QueueTime: 2 * time.Nanosecond.Nanoseconds(), BytesProcessedPerSecond: int64(42), LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), @@ -106,6 +107,7 @@ func TestSnapshot_JoinResults(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * time.Second.Seconds(), + QueueTime: 2 * time.Nanosecond.Nanoseconds(), BytesProcessedPerSecond: int64(42), LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), @@ -114,7 +116,7 @@ func TestSnapshot_JoinResults(t *testing.T) { } JoinResults(ctx, expected) - res := statsCtx.Result(2 * time.Second) + res := statsCtx.Result(2*time.Second, 2*time.Nanosecond) require.Equal(t, expected, res) } @@ -177,6 +179,7 @@ func TestResult_Merge(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * time.Second.Seconds(), + QueueTime: 2 * time.Nanosecond.Nanoseconds(), BytesProcessedPerSecond: int64(42), LinesProcessedPerSecond: int64(50), TotalBytesProcessed: int64(84), @@ -223,6 +226,7 @@ func TestResult_Merge(t *testing.T) { }, Summary: Summary{ ExecTime: 2 * 2 * time.Second.Seconds(), + QueueTime: 2 * 2 * time.Nanosecond.Nanoseconds(), BytesProcessedPerSecond: int64(42), // 2 requests at the same pace should give the same bytes/lines per sec LinesProcessedPerSecond: int64(50), TotalBytesProcessed: 2 * int64(84), @@ -234,10 +238,10 @@ func TestResult_Merge(t *testing.T) { func TestReset(t *testing.T) { statsCtx, ctx := NewContext(context.Background()) fakeIngesterQuery(ctx) - res := statsCtx.Result(2 * time.Second) + res := statsCtx.Result(2*time.Second, 2*time.Millisecond) require.NotEmpty(t, res) statsCtx.Reset() - res = statsCtx.Result(0) + res = statsCtx.Result(0, 0) require.Empty(t, res) } diff --git a/pkg/logqlmodel/stats/stats.pb.go b/pkg/logqlmodel/stats/stats.pb.go index 7538f6053b..943280d550 100644 --- a/pkg/logqlmodel/stats/stats.pb.go +++ b/pkg/logqlmodel/stats/stats.pb.go @@ -98,6 +98,8 @@ type Summary struct { TotalLinesProcessed int64 `protobuf:"varint,4,opt,name=totalLinesProcessed,proto3" json:"totalLinesProcessed"` // Execution time in seconds. ExecTime float64 `protobuf:"fixed64,5,opt,name=execTime,proto3" json:"execTime"` + // Queue time in nanoseconds. + QueueTime int64 `protobuf:"varint,6,opt,name=queueTime,proto3" json:"queueTime"` } func (m *Summary) Reset() { *m = Summary{} } @@ -167,6 +169,13 @@ func (m *Summary) GetExecTime() float64 { return 0 } +func (m *Summary) GetQueueTime() int64 { + if m != nil { + return m.QueueTime + } + return 0 +} + type Querier struct { Store Store `protobuf:"bytes,1,opt,name=store,proto3" json:"store"` } @@ -460,51 +469,52 @@ func init() { func init() { proto.RegisterFile("pkg/logqlmodel/stats/stats.proto", fileDescriptor_6cdfe5d2aea33ebb) } var fileDescriptor_6cdfe5d2aea33ebb = []byte{ - // 693 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xbf, 0x6f, 0xd3, 0x40, - 0x14, 0xb6, 0x13, 0xdc, 0x84, 0xa3, 0xb4, 0xe5, 0xaa, 0xd2, 0x00, 0x92, 0x5d, 0x65, 0xea, 0x00, - 0x8d, 0xf8, 0xb1, 0x80, 0xe8, 0xe2, 0x56, 0x48, 0x95, 0x40, 0x94, 0x57, 0x58, 0xd8, 0x1c, 0xe7, - 0x9a, 0x58, 0x75, 0x7c, 0xa9, 0x7f, 0x08, 0xba, 0xb1, 0x31, 0xc2, 0x9f, 0xc1, 0xc2, 0x9f, 0xc0, - 0xde, 0xb1, 0x63, 0x27, 0x8b, 0xba, 0x0b, 0xf2, 0xd4, 0x8d, 0x15, 0xf9, 0x9d, 0x63, 0xd7, 0x17, - 0x47, 0x62, 0xb1, 0xef, 0x7d, 0xdf, 0xfb, 0xde, 0x3b, 0xbf, 0xef, 0xac, 0x23, 0x1b, 0x93, 0xa3, - 0x61, 0xcf, 0xe5, 0xc3, 0x63, 0x77, 0xcc, 0x07, 0xcc, 0xed, 0x05, 0xa1, 0x15, 0x06, 0xe2, 0xb9, - 0x35, 0xf1, 0x79, 0xc8, 0xa9, 0x86, 0xc1, 0xfd, 0x47, 0x43, 0x27, 0x1c, 0x45, 0xfd, 0x2d, 0x9b, - 0x8f, 0x7b, 0x43, 0x3e, 0xe4, 0x3d, 0x64, 0xfb, 0xd1, 0x21, 0x46, 0x18, 0xe0, 0x4a, 0xa8, 0xba, - 0xbf, 0x54, 0xb2, 0x00, 0x2c, 0x88, 0xdc, 0x90, 0x3e, 0x27, 0xad, 0x20, 0x1a, 0x8f, 0x2d, 0xff, - 0xa4, 0xa3, 0x6e, 0xa8, 0x9b, 0xb7, 0x9e, 0x2c, 0x6d, 0x89, 0xfa, 0x07, 0x02, 0x35, 0x97, 0x4f, - 0x63, 0x43, 0x49, 0x63, 0x63, 0x9a, 0x06, 0xd3, 0x45, 0x26, 0x3d, 0x8e, 0x98, 0xef, 0x30, 0xbf, - 0xd3, 0xa8, 0x48, 0xdf, 0x09, 0xb4, 0x94, 0xe6, 0x69, 0x30, 0x5d, 0xd0, 0x6d, 0xd2, 0x76, 0xbc, - 0x21, 0x0b, 0x42, 0xe6, 0x77, 0x9a, 0xa8, 0x5d, 0xce, 0xb5, 0x7b, 0x39, 0x6c, 0xae, 0xe4, 0xe2, - 0x22, 0x11, 0x8a, 0x55, 0xf7, 0x6f, 0x83, 0xb4, 0xf2, 0xfd, 0xd1, 0x0f, 0x64, 0xbd, 0x7f, 0x12, - 0xb2, 0x60, 0xdf, 0xe7, 0x36, 0x0b, 0x02, 0x36, 0xd8, 0x67, 0xfe, 0x01, 0xb3, 0xb9, 0x37, 0xc0, - 0x0f, 0x6a, 0x9a, 0x0f, 0xd2, 0xd8, 0x98, 0x97, 0x02, 0xf3, 0x88, 0xac, 0xac, 0xeb, 0x78, 0xb5, - 0x65, 0x1b, 0x65, 0xd9, 0x39, 0x29, 0x30, 0x8f, 0xa0, 0x7b, 0x64, 0x35, 0xe4, 0xa1, 0xe5, 0x9a, - 0x95, 0xb6, 0x38, 0x83, 0xa6, 0xb9, 0x9e, 0xc6, 0x46, 0x1d, 0x0d, 0x75, 0x60, 0x51, 0xea, 0x75, - 0xa5, 0x55, 0xe7, 0x86, 0x54, 0xaa, 0x4a, 0x43, 0x1d, 0x48, 0x37, 0x49, 0x9b, 0x7d, 0x66, 0xf6, - 0x7b, 0x67, 0xcc, 0x3a, 0xda, 0x86, 0xba, 0xa9, 0x9a, 0x8b, 0xd9, 0xe4, 0xa7, 0x18, 0x14, 0xab, - 0xee, 0x4b, 0xd2, 0xca, 0xdd, 0xa5, 0x8f, 0x89, 0x16, 0x84, 0xdc, 0x67, 0xf9, 0xb9, 0x59, 0x9c, - 0x9e, 0x9b, 0x0c, 0x33, 0x6f, 0xe7, 0xee, 0x89, 0x14, 0x10, 0xaf, 0xee, 0xcf, 0x06, 0x69, 0x4f, - 0x0d, 0xa6, 0xcf, 0xc8, 0x22, 0xee, 0x05, 0x98, 0x65, 0x8f, 0x98, 0x70, 0x4b, 0x33, 0x57, 0xd2, - 0xd8, 0xa8, 0xe0, 0x50, 0x89, 0xe8, 0x2b, 0x42, 0x31, 0xde, 0x19, 0x45, 0xde, 0x51, 0xf0, 0xc6, - 0x0a, 0x51, 0x2b, 0x2c, 0xb9, 0x9b, 0xc6, 0x46, 0x0d, 0x0b, 0x35, 0x58, 0xd1, 0xdd, 0xc4, 0x38, - 0xc8, 0x1d, 0x28, 0xbb, 0xe7, 0x38, 0x54, 0x22, 0xfa, 0x82, 0x2c, 0x95, 0xf3, 0x3b, 0x60, 0x5e, - 0x98, 0x8f, 0x9b, 0xa6, 0xb1, 0x21, 0x31, 0x20, 0xc5, 0xe5, 0xbc, 0xb4, 0xff, 0x9e, 0xd7, 0xb7, - 0x06, 0xd1, 0x90, 0x2f, 0x1a, 0x8b, 0x8f, 0x00, 0x76, 0x98, 0x1f, 0xee, 0xb2, 0x71, 0xc1, 0x80, - 0x14, 0xd3, 0xb7, 0x64, 0xed, 0x1a, 0xb2, 0xcb, 0x3f, 0x79, 0x2e, 0xb7, 0x06, 0xc5, 0xd4, 0xee, - 0xa5, 0xb1, 0x51, 0x9f, 0x00, 0xf5, 0x70, 0xe6, 0x81, 0x5d, 0xc1, 0xf0, 0xe0, 0x34, 0x4b, 0x0f, - 0x66, 0x59, 0xa8, 0xc1, 0xb2, 0x89, 0x20, 0x8a, 0x43, 0x2c, 0x27, 0x82, 0xfd, 0xca, 0x89, 0x60, - 0x0a, 0x88, 0x57, 0xf7, 0x6b, 0x93, 0x68, 0xc8, 0x67, 0x13, 0x19, 0x31, 0x6b, 0x20, 0x92, 0xb3, - 0x3f, 0xe3, 0xba, 0x15, 0x55, 0x06, 0xa4, 0xb8, 0xa2, 0x45, 0x83, 0xd0, 0x13, 0x59, 0x8b, 0x0c, - 0x48, 0x31, 0xdd, 0x21, 0x77, 0x06, 0xcc, 0xe6, 0xe3, 0x89, 0x8f, 0xff, 0x8e, 0x68, 0xbd, 0x80, - 0xf2, 0xb5, 0x34, 0x36, 0x66, 0x49, 0x98, 0x85, 0xe4, 0x22, 0x62, 0x0f, 0xad, 0xfa, 0x22, 0x62, - 0x1b, 0xb3, 0x10, 0xdd, 0x26, 0xcb, 0xf2, 0x3e, 0xda, 0x58, 0x62, 0x35, 0x8d, 0x0d, 0x99, 0x02, - 0x19, 0xc8, 0xe4, 0x68, 0xef, 0x6e, 0x34, 0x71, 0x1d, 0xdb, 0xca, 0xe4, 0x37, 0x4b, 0xb9, 0x44, - 0x81, 0x0c, 0x98, 0xfd, 0xb3, 0x0b, 0x5d, 0x39, 0xbf, 0xd0, 0x95, 0xab, 0x0b, 0x5d, 0xfd, 0x92, - 0xe8, 0xea, 0x8f, 0x44, 0x57, 0x4f, 0x13, 0x5d, 0x3d, 0x4b, 0x74, 0xf5, 0x77, 0xa2, 0xab, 0x7f, - 0x12, 0x5d, 0xb9, 0x4a, 0x74, 0xf5, 0xfb, 0xa5, 0xae, 0x9c, 0x5d, 0xea, 0xca, 0xf9, 0xa5, 0xae, - 0x7c, 0x7c, 0x78, 0xfd, 0xa2, 0xf2, 0xad, 0x43, 0xcb, 0xb3, 0x7a, 0x2e, 0x3f, 0x72, 0x7a, 0x75, - 0x37, 0x5d, 0x7f, 0x01, 0xaf, 0xab, 0xa7, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x2e, 0xbd, 0xac, - 0xdd, 0x08, 0x07, 0x00, 0x00, + // 717 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x4d, 0x6f, 0xd3, 0x4a, + 0x14, 0xb5, 0x93, 0xe7, 0x26, 0x9d, 0xd7, 0xaf, 0x37, 0x55, 0x5f, 0xf3, 0x1e, 0x92, 0x5d, 0x65, + 0x55, 0x09, 0x68, 0xc4, 0xc7, 0x06, 0x44, 0x37, 0x6e, 0x85, 0x54, 0x09, 0x44, 0xb9, 0x85, 0x0d, + 0x3b, 0xc7, 0x99, 0x26, 0x56, 0x1d, 0x4f, 0xea, 0x0f, 0x41, 0x77, 0xec, 0x58, 0xc2, 0x8f, 0x60, + 0xc1, 0x86, 0x9f, 0xc0, 0xbe, 0xcb, 0x2e, 0xbb, 0xb2, 0xa8, 0xbb, 0x41, 0x5e, 0xf5, 0x27, 0x20, + 0xdf, 0x71, 0xec, 0x7a, 0xe2, 0x48, 0x6c, 0x92, 0xb9, 0xe7, 0xdc, 0x73, 0xef, 0xf8, 0x9e, 0x19, + 0x0d, 0xd9, 0x9a, 0x9c, 0x0c, 0x7b, 0x2e, 0x1f, 0x9e, 0xba, 0x63, 0x3e, 0x60, 0x6e, 0x2f, 0x08, + 0xad, 0x30, 0x10, 0xbf, 0x3b, 0x13, 0x9f, 0x87, 0x9c, 0x6a, 0x18, 0xfc, 0x7f, 0x7f, 0xe8, 0x84, + 0xa3, 0xa8, 0xbf, 0x63, 0xf3, 0x71, 0x6f, 0xc8, 0x87, 0xbc, 0x87, 0x6c, 0x3f, 0x3a, 0xc6, 0x08, + 0x03, 0x5c, 0x09, 0x55, 0xf7, 0x87, 0x4a, 0x16, 0x80, 0x05, 0x91, 0x1b, 0xd2, 0x27, 0xa4, 0x15, + 0x44, 0xe3, 0xb1, 0xe5, 0x9f, 0x75, 0xd4, 0x2d, 0x75, 0xfb, 0xef, 0x87, 0x2b, 0x3b, 0xa2, 0xfe, + 0x91, 0x40, 0xcd, 0xd5, 0xf3, 0xd8, 0x50, 0xd2, 0xd8, 0x98, 0xa6, 0xc1, 0x74, 0x91, 0x49, 0x4f, + 0x23, 0xe6, 0x3b, 0xcc, 0xef, 0x34, 0x2a, 0xd2, 0xd7, 0x02, 0x2d, 0xa5, 0x79, 0x1a, 0x4c, 0x17, + 0x74, 0x97, 0xb4, 0x1d, 0x6f, 0xc8, 0x82, 0x90, 0xf9, 0x9d, 0x26, 0x6a, 0x57, 0x73, 0xed, 0x41, + 0x0e, 0x9b, 0x6b, 0xb9, 0xb8, 0x48, 0x84, 0x62, 0xd5, 0xfd, 0xda, 0x24, 0xad, 0x7c, 0x7f, 0xf4, + 0x2d, 0xd9, 0xec, 0x9f, 0x85, 0x2c, 0x38, 0xf4, 0xb9, 0xcd, 0x82, 0x80, 0x0d, 0x0e, 0x99, 0x7f, + 0xc4, 0x6c, 0xee, 0x0d, 0xf0, 0x83, 0x9a, 0xe6, 0x9d, 0x34, 0x36, 0xe6, 0xa5, 0xc0, 0x3c, 0x22, + 0x2b, 0xeb, 0x3a, 0x5e, 0x6d, 0xd9, 0x46, 0x59, 0x76, 0x4e, 0x0a, 0xcc, 0x23, 0xe8, 0x01, 0x59, + 0x0f, 0x79, 0x68, 0xb9, 0x66, 0xa5, 0x2d, 0xce, 0xa0, 0x69, 0x6e, 0xa6, 0xb1, 0x51, 0x47, 0x43, + 0x1d, 0x58, 0x94, 0x7a, 0x51, 0x69, 0xd5, 0xf9, 0x4b, 0x2a, 0x55, 0xa5, 0xa1, 0x0e, 0xa4, 0xdb, + 0xa4, 0xcd, 0x3e, 0x30, 0xfb, 0x8d, 0x33, 0x66, 0x1d, 0x6d, 0x4b, 0xdd, 0x56, 0xcd, 0xa5, 0x6c, + 0xf2, 0x53, 0x0c, 0x8a, 0x15, 0xbd, 0x4b, 0x16, 0x4f, 0x23, 0x16, 0x31, 0x4c, 0x5d, 0xc0, 0x56, + 0xcb, 0x69, 0x6c, 0x94, 0x20, 0x94, 0xcb, 0xee, 0x33, 0xd2, 0xca, 0x8f, 0x02, 0x7d, 0x40, 0xb4, + 0x20, 0xe4, 0x3e, 0xcb, 0x0f, 0xd9, 0xd2, 0xf4, 0x90, 0x65, 0x98, 0xb9, 0x9c, 0x5b, 0x2d, 0x52, + 0x40, 0xfc, 0x75, 0xbf, 0x37, 0x48, 0x7b, 0x7a, 0x1a, 0xe8, 0x63, 0xb2, 0x84, 0x1b, 0x07, 0x66, + 0xd9, 0x23, 0x26, 0xac, 0xd5, 0xcc, 0xb5, 0x34, 0x36, 0x2a, 0x38, 0x54, 0x22, 0xfa, 0x9c, 0x50, + 0x8c, 0xf7, 0x46, 0x91, 0x77, 0x12, 0xbc, 0xb4, 0x42, 0xd4, 0x0a, 0xff, 0xfe, 0x4d, 0x63, 0xa3, + 0x86, 0x85, 0x1a, 0xac, 0xe8, 0x6e, 0x62, 0x1c, 0xe4, 0x76, 0x95, 0xdd, 0x73, 0x1c, 0x2a, 0x11, + 0x7d, 0x4a, 0x56, 0xca, 0x61, 0x1f, 0x31, 0x2f, 0xcc, 0xbd, 0xa1, 0x69, 0x6c, 0x48, 0x0c, 0x48, + 0x71, 0x39, 0x2f, 0xed, 0x8f, 0xe7, 0xf5, 0xb9, 0x41, 0x34, 0xe4, 0x8b, 0xc6, 0xe2, 0x23, 0x80, + 0x1d, 0xe7, 0x37, 0xa1, 0x6c, 0x5c, 0x30, 0x20, 0xc5, 0xf4, 0x15, 0xd9, 0xb8, 0x85, 0xec, 0xf3, + 0xf7, 0x9e, 0xcb, 0xad, 0x41, 0x31, 0xb5, 0xff, 0xd2, 0xd8, 0xa8, 0x4f, 0x80, 0x7a, 0x38, 0xf3, + 0xc0, 0xae, 0x60, 0x78, 0x74, 0x9a, 0xa5, 0x07, 0xb3, 0x2c, 0xd4, 0x60, 0xd9, 0x44, 0x10, 0xc5, + 0x21, 0x96, 0x13, 0xc1, 0x7e, 0xe5, 0x44, 0x30, 0x05, 0xc4, 0x5f, 0xf7, 0x53, 0x93, 0x68, 0xc8, + 0x67, 0x13, 0x19, 0x31, 0x6b, 0x20, 0x92, 0xb3, 0x6b, 0x74, 0xdb, 0x8a, 0x2a, 0x03, 0x52, 0x5c, + 0xd1, 0xa2, 0x41, 0xe8, 0x89, 0xac, 0x45, 0x06, 0xa4, 0x98, 0xee, 0x91, 0x7f, 0x06, 0xcc, 0xe6, + 0xe3, 0x89, 0x8f, 0x17, 0x4d, 0xb4, 0x16, 0xd7, 0x66, 0x23, 0x8d, 0x8d, 0x59, 0x12, 0x66, 0x21, + 0xb9, 0x88, 0xd8, 0x43, 0xab, 0xbe, 0x88, 0xd8, 0xc6, 0x2c, 0x44, 0x77, 0xc9, 0xaa, 0xbc, 0x8f, + 0x36, 0x96, 0x58, 0x4f, 0x63, 0x43, 0xa6, 0x40, 0x06, 0x32, 0x39, 0xda, 0xbb, 0x1f, 0x4d, 0x5c, + 0xc7, 0xb6, 0x32, 0xf9, 0x62, 0x29, 0x97, 0x28, 0x90, 0x01, 0xb3, 0x7f, 0x71, 0xa5, 0x2b, 0x97, + 0x57, 0xba, 0x72, 0x73, 0xa5, 0xab, 0x1f, 0x13, 0x5d, 0xfd, 0x96, 0xe8, 0xea, 0x79, 0xa2, 0xab, + 0x17, 0x89, 0xae, 0xfe, 0x4c, 0x74, 0xf5, 0x57, 0xa2, 0x2b, 0x37, 0x89, 0xae, 0x7e, 0xb9, 0xd6, + 0x95, 0x8b, 0x6b, 0x5d, 0xb9, 0xbc, 0xd6, 0x95, 0x77, 0xf7, 0x6e, 0xbf, 0x6a, 0xbe, 0x75, 0x6c, + 0x79, 0x56, 0xcf, 0xe5, 0x27, 0x4e, 0xaf, 0xee, 0x59, 0xec, 0x2f, 0xe0, 0xdb, 0xf6, 0xe8, 0x77, + 0x00, 0x00, 0x00, 0xff, 0xff, 0xa9, 0x02, 0x9d, 0x66, 0x35, 0x07, 0x00, 0x00, } func (this *Result) Equal(that interface{}) bool { @@ -571,6 +581,9 @@ func (this *Summary) Equal(that interface{}) bool { if this.ExecTime != that1.ExecTime { return false } + if this.QueueTime != that1.QueueTime { + return false + } return true } func (this *Querier) Equal(that interface{}) bool { @@ -721,13 +734,14 @@ func (this *Summary) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 10) s = append(s, "&stats.Summary{") s = append(s, "BytesProcessedPerSecond: "+fmt.Sprintf("%#v", this.BytesProcessedPerSecond)+",\n") s = append(s, "LinesProcessedPerSecond: "+fmt.Sprintf("%#v", this.LinesProcessedPerSecond)+",\n") s = append(s, "TotalBytesProcessed: "+fmt.Sprintf("%#v", this.TotalBytesProcessed)+",\n") s = append(s, "TotalLinesProcessed: "+fmt.Sprintf("%#v", this.TotalLinesProcessed)+",\n") s = append(s, "ExecTime: "+fmt.Sprintf("%#v", this.ExecTime)+",\n") + s = append(s, "QueueTime: "+fmt.Sprintf("%#v", this.QueueTime)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -864,6 +878,11 @@ func (m *Summary) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.QueueTime != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.QueueTime)) + i-- + dAtA[i] = 0x30 + } if m.ExecTime != 0 { i -= 8 encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.ExecTime)))) @@ -1127,6 +1146,9 @@ func (m *Summary) Size() (n int) { if m.ExecTime != 0 { n += 9 } + if m.QueueTime != 0 { + n += 1 + sovStats(uint64(m.QueueTime)) + } return n } @@ -1239,6 +1261,7 @@ func (this *Summary) String() string { `TotalBytesProcessed:` + fmt.Sprintf("%v", this.TotalBytesProcessed) + `,`, `TotalLinesProcessed:` + fmt.Sprintf("%v", this.TotalLinesProcessed) + `,`, `ExecTime:` + fmt.Sprintf("%v", this.ExecTime) + `,`, + `QueueTime:` + fmt.Sprintf("%v", this.QueueTime) + `,`, `}`, }, "") return s @@ -1571,6 +1594,25 @@ func (m *Summary) Unmarshal(dAtA []byte) error { v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) iNdEx += 8 m.ExecTime = float64(math.Float64frombits(v)) + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field QueueTime", wireType) + } + m.QueueTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.QueueTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/logqlmodel/stats/stats.proto b/pkg/logqlmodel/stats/stats.proto index 783433e4ad..ff2c6f3994 100644 --- a/pkg/logqlmodel/stats/stats.proto +++ b/pkg/logqlmodel/stats/stats.proto @@ -28,6 +28,8 @@ message Summary { int64 totalLinesProcessed = 4 [(gogoproto.jsontag) = "totalLinesProcessed"]; // Execution time in seconds. double execTime = 5 [(gogoproto.jsontag) = "execTime"]; + // Queue time in nanoseconds. + int64 queueTime = 6 [(gogoproto.jsontag) = "queueTime"]; } message Querier { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b2b6bf2d69..c57273e84b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -236,15 +236,19 @@ func (t *Loki) initQuerier() (services.Service, error) { SchedulerRing: scheduler.SafeReadRing(t.queryScheduler), } + httpMiddleware := middleware.Merge( + httpreq.ExtractQueryMetricsMiddleware(), + ) + queryHandlers := map[string]http.Handler{ - "/loki/api/v1/query_range": http.HandlerFunc(t.Querier.RangeQueryHandler), - "/loki/api/v1/query": http.HandlerFunc(t.Querier.InstantQueryHandler), + "/loki/api/v1/query_range": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.RangeQueryHandler)), + "/loki/api/v1/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.InstantQueryHandler)), "/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), "/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler), - "/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler), + "/api/prom/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LogQueryHandler)), "/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler), "/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler), "/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler), diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index a49db3f6b3..4fa7df63e4 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -903,10 +903,11 @@ var ( }, "summary": { "bytesProcessedPerSecond": 20, - "execTime": 21, - "linesProcessedPerSecond": 22, - "totalBytesProcessed": 23, - "totalLinesProcessed": 24 + "queueTime": 21, + "execTime": 22, + "linesProcessedPerSecond": 23, + "totalBytesProcessed": 24, + "totalLinesProcessed": 25 } },` matrixString = `{ @@ -1052,10 +1053,11 @@ var ( statsResult = stats.Result{ Summary: stats.Summary{ BytesProcessedPerSecond: 20, - ExecTime: 21, - LinesProcessedPerSecond: 22, - TotalBytesProcessed: 23, - TotalLinesProcessed: 24, + QueueTime: 21, + ExecTime: 22, + LinesProcessedPerSecond: 23, + TotalBytesProcessed: 24, + TotalLinesProcessed: 25, }, Querier: stats.Querier{ Store: stats.Store{ diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index c8387dd345..c17a17be13 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -120,12 +120,12 @@ func TestResponseToResult(t *testing.T) { }}, }, Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, }, }, expected: logqlmodel.Result{ Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, }, Data: logqlmodel.Streams{{ Labels: `{foo="bar"}`, @@ -144,7 +144,7 @@ func TestResponseToResult(t *testing.T) { desc: "LokiPromResponse", input: &LokiPromResponse{ Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, }, Response: &queryrange.PrometheusResponse{ Data: queryrange.PrometheusData{ @@ -154,7 +154,7 @@ func TestResponseToResult(t *testing.T) { }, expected: logqlmodel.Result{ Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, }, Data: sampleStreamToMatrix(testSampleStreams()), }, @@ -310,7 +310,7 @@ func TestInstanceDownstream(t *testing.T) { }}, }, Statistics: stats.Result{ - Summary: stats.Summary{ExecTime: 1}, + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, }, } } diff --git a/pkg/querier/queryrange/prometheus_test.go b/pkg/querier/queryrange/prometheus_test.go index 827dd1de89..d34d286a62 100644 --- a/pkg/querier/queryrange/prometheus_test.go +++ b/pkg/querier/queryrange/prometheus_test.go @@ -49,6 +49,7 @@ var emptyStats = `"stats": { }, "summary": { "bytesProcessedPerSecond": 0, + "queueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 154d14511e..ad141589c1 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -102,8 +102,9 @@ func StatsCollectorMiddleware() queryrange.Middleware { } } if statistics != nil { - // Re-calculate the summary then log and record metrics for the current query - statistics.ComputeSummary(time.Since(start)) + // Re-calculate the summary: the queueTime result is already merged so should not be updated + // Log and record metrics for the current query + statistics.ComputeSummary(time.Since(start), 0) statistics.Log(level.Debug(logger)) } ctxValue := ctx.Value(ctxKey) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 498114f2b3..56b957d133 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "flag" "io" "net/http" + "net/textproto" "sync" "time" @@ -37,6 +38,7 @@ import ( lokiutil "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" + lokihttpreq "github.com/grafana/loki/pkg/util/httpreq" ) var ( @@ -247,7 +249,7 @@ type schedulerRequest struct { request *httpgrpc.HTTPRequest statsEnabled bool - enqueueTime time.Time + queueTime time.Time ctx context.Context ctxCancel context.CancelFunc @@ -396,7 +398,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr req.parentSpanContext = parentSpanContext req.queueSpan, req.ctx = opentracing.StartSpanFromContextWithTracer(ctx, tracer, "queued", opentracing.ChildOf(parentSpanContext)) - req.enqueueTime = now + req.queueTime = now req.ctxCancel = cancel // aggregate the max queriers limit in the case of a multi tenant query @@ -462,9 +464,16 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL r := req.(*schedulerRequest) - s.queueDuration.Observe(time.Since(r.enqueueTime).Seconds()) + reqQueueTime := time.Since(r.queueTime) + s.queueDuration.Observe(reqQueueTime.Seconds()) r.queueSpan.Finish() + // Add HTTP header to the request containing the query queue time + r.request.Headers = append(r.request.Headers, &httpgrpc.Header{ + Key: textproto.CanonicalMIMEHeaderKey(string(lokihttpreq.QueryQueueTimeHTTPHeader)), + Values: []string{reqQueueTime.String()}, + }) + /* We want to dequeue the next unexpired request from the chosen tenant queue. The chance of choosing a particular tenant for dequeueing is (1/active_tenants). diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 98fea8fdf4..9144ef0b5e 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1665,7 +1665,7 @@ func Benchmark_store_OverlappingChunks(b *testing.B) { b.Fatal(err) } } - r := statsCtx.Result(time.Since(start)) + r := statsCtx.Result(time.Since(start), 0) b.Log("Total chunks:" + fmt.Sprintf("%d", r.TotalChunksRef())) b.Log("Total bytes decompressed:" + fmt.Sprintf("%d", r.TotalDecompressedBytes())) } diff --git a/pkg/util/httpreq/tags.go b/pkg/util/httpreq/tags.go index 222b37e51c..d310dbe59d 100644 --- a/pkg/util/httpreq/tags.go +++ b/pkg/util/httpreq/tags.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "regexp" + "time" "github.com/weaveworks/common/middleware" ) @@ -16,6 +17,7 @@ var ( QueryTagsHTTPHeader ctxKey = "X-Query-Tags" safeQueryTags = regexp.MustCompile("[^a-zA-Z0-9-=, ]+") // only alpha-numeric, ' ', ',', '=' and `-` + QueryQueueTimeHTTPHeader ctxKey = "X-Query-Queue-Time" ) func ExtractQueryTagsMiddleware() middleware.Interface { @@ -33,3 +35,22 @@ func ExtractQueryTagsMiddleware() middleware.Interface { }) }) } + +func ExtractQueryMetricsMiddleware() middleware.Interface { + return middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + + queueTimeHeader := req.Header.Get(string(QueryQueueTimeHTTPHeader)) + if queueTimeHeader != "" { + queueTime, err := time.ParseDuration(queueTimeHeader) + if err == nil { + ctx = context.WithValue(ctx, QueryQueueTimeHTTPHeader, queueTime) + req = req.WithContext(ctx) + } + } + + next.ServeHTTP(w, req) + }) + }) +} diff --git a/pkg/util/httpreq/tags_test.go b/pkg/util/httpreq/tags_test.go index 09b651a289..bf96142ca8 100644 --- a/pkg/util/httpreq/tags_test.go +++ b/pkg/util/httpreq/tags_test.go @@ -4,6 +4,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -49,3 +50,44 @@ func TestQueryTags(t *testing.T) { }) } } + +func TestQueryMetrics(t *testing.T) { + for _, tc := range []struct { + desc string + in string + exp interface{} + error bool + }{ + { + desc: "valid time duration", + in: `2s`, + exp: 2 * time.Second, + }, + { + desc: "empty header", + in: ``, + exp: nil, + }, + { + desc: "invalid time duration", + in: `foo`, + exp: nil, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + req := httptest.NewRequest("GET", "http://testing.com", nil) + req.Header.Set(string(QueryQueueTimeHTTPHeader), tc.in) + + w := httptest.NewRecorder() + checked := false + mware := ExtractQueryMetricsMiddleware().Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + require.Equal(t, tc.exp, req.Context().Value(QueryQueueTimeHTTPHeader)) + checked = true + })) + + mware.ServeHTTP(w, req) + + assert.True(t, true, checked) + }) + } +} diff --git a/pkg/util/marshal/legacy/marshal_test.go b/pkg/util/marshal/legacy/marshal_test.go index 54dc23571d..6363ea805d 100644 --- a/pkg/util/marshal/legacy/marshal_test.go +++ b/pkg/util/marshal/legacy/marshal_test.go @@ -80,6 +80,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "queueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index 8e29323612..aef5589f5b 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -86,6 +86,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "queueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, @@ -193,6 +194,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "queueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0, @@ -317,6 +319,7 @@ var queryTests = []struct { }, "summary": { "bytesProcessedPerSecond": 0, + "queueTime": 0, "execTime": 0, "linesProcessedPerSecond": 0, "totalBytesProcessed":0,