Log time in queue per request (#4949)

* Log time in queue per request

* Add querierID to log message

* Add unescape request url and tenant_ids to log message

* Add query enqueue time to metrics.go

* Remove "querier request dequeued" log from scheduler

* Address comments

* Rename enqueue_time to queue_time
* Store queue time in nanoseconds (int64)
* Use CanonicalMIMEHeaderKey for setting the httpgrpc header
pull/5038/head
Susana Ferreira 4 years ago committed by GitHub
parent 8b71a44a86
commit 82f8f3c1e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 1
      docs/sources/api/_index.md
  3. 4
      pkg/chunkenc/memchunk_test.go
  4. 2
      pkg/iter/entry_iterator_test.go
  5. 5
      pkg/logql/engine.go
  6. 6
      pkg/logql/engine_test.go
  7. 1
      pkg/logql/metrics.go
  8. 3
      pkg/logql/metrics_test.go
  9. 13
      pkg/logqlmodel/stats/context.go
  10. 12
      pkg/logqlmodel/stats/context_test.go
  11. 134
      pkg/logqlmodel/stats/stats.pb.go
  12. 2
      pkg/logqlmodel/stats/stats.proto
  13. 10
      pkg/loki/modules.go
  14. 18
      pkg/querier/queryrange/codec_test.go
  15. 10
      pkg/querier/queryrange/downstreamer_test.go
  16. 1
      pkg/querier/queryrange/prometheus_test.go
  17. 5
      pkg/querier/queryrange/stats.go
  18. 15
      pkg/scheduler/scheduler.go
  19. 2
      pkg/storage/batch_test.go
  20. 21
      pkg/util/httpreq/tags.go
  21. 42
      pkg/util/httpreq/tags_test.go
  22. 1
      pkg/util/marshal/legacy/marshal_test.go
  23. 3
      pkg/util/marshal/marshal_test.go

@ -1,5 +1,6 @@
## Main
* [4949](https://github.com/grafana/loki/pull/4949) **ssncferreira**: Add query `queueTime` metric to statistics and metrics.go
* [4938](https://github.com/grafana/loki/pull/4938) **DylanGuedes**: Implement ring status page for the distributor
* [5023](https://github.com/grafana/loki/pull/5023) **ssncferreira**: Move `querier.split-queries-by-interval` to a per-tenant configuration
* [4993](https://github.com/grafana/loki/pull/4926) **thejosephstevens**: Fix parent of wal and wal_cleaner in loki ruler config docs

@ -939,6 +939,7 @@ The example belows show all possible statistics returned with their respective d
},
"summary": {
"bytesProcessedPerSecond": 0, // Total of bytes processed per second
"queueTime": 0, // Total queue time in nanoseconds (int)
"execTime": 0, // Total execution time in seconds (float)
"linesProcessedPerSecond": 0, // Total lines processed per second
"totalBytesProcessed":0, // Total amount of bytes processed overall for this request

@ -567,7 +567,7 @@ func TestChunkStats(t *testing.T) {
t.Fatal(err)
}
// test on a chunk filling up
s := statsCtx.Result(time.Since(first))
s := statsCtx.Result(time.Since(first), 0)
require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed)
require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed)
@ -594,7 +594,7 @@ func TestChunkStats(t *testing.T) {
if err := it.Close(); err != nil {
t.Fatal(err)
}
s = statsCtx.Result(time.Since(first))
s = statsCtx.Result(time.Since(first), 0)
require.Equal(t, int64(expectedSize), s.Summary.TotalBytesProcessed)
require.Equal(t, int64(inserted), s.Summary.TotalLinesProcessed)

@ -550,7 +550,7 @@ func Test_DuplicateCount(t *testing.T) {
defer it.Close()
for it.Next() {
}
require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0).TotalDuplicates())
require.Equal(t, test.expectedDuplicates, stats.FromContext(ctx).Result(0, 0).TotalDuplicates())
})
}
}

@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
)
var (
@ -120,7 +121,9 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
data, err := q.Eval(ctx)
statResult := statsCtx.Result(time.Since(start))
queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime)
statResult.Log(level.Debug(log))
status := "200"

@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
)
var (
@ -2062,6 +2063,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it
func TestEngine_Stats(t *testing.T) {
eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits)
queueTime := 2 * time.Nanosecond
q := eng.Query(LiteralParams{
qs: `{foo="bar"}`,
start: time.Now(),
@ -2069,9 +2071,11 @@ func TestEngine_Stats(t *testing.T) {
direction: logproto.BACKWARD,
limit: 1000,
})
r, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
ctx := context.WithValue(context.Background(), httpreq.QueryQueueTimeHTTPHeader, queueTime)
r, err := q.Exec(user.InjectOrgID(ctx, "fake"))
require.NoError(t, err)
require.Equal(t, int64(1), r.Statistics.TotalDecompressedBytes())
require.Equal(t, queueTime.Nanoseconds(), r.Statistics.Summary.QueueTime)
}
type errorIteratorQuerier struct {

@ -106,6 +106,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"queue_time", time.Duration(stats.Summary.QueueTime),
}...)
logValues = append(logValues, tagsToKeyValues(queryTags)...)

@ -76,13 +76,14 @@ func TestLogSlowQuery(t *testing.T) {
}, "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
QueueTime: 2,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
}, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB source=logvolhist feature=beta\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB queue_time=2ns source=logvolhist feature=beta\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())

@ -91,7 +91,7 @@ func (c *Context) Reset() {
}
// Result calculates the summary based on store and ingester data.
func (c *Context) Result(execTime time.Duration) Result {
func (c *Context) Result(execTime time.Duration, queueTime time.Duration) Result {
r := c.result
r.Merge(Result{
@ -101,7 +101,7 @@ func (c *Context) Result(execTime time.Duration) Result {
Ingester: c.ingester,
})
r.ComputeSummary(execTime)
r.ComputeSummary(execTime, queueTime)
return r
}
@ -125,7 +125,7 @@ func JoinIngesters(ctx context.Context, inc Ingester) {
}
// ComputeSummary compute the summary of the statistics.
func (r *Result) ComputeSummary(execTime time.Duration) {
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration) {
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes
r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines +
@ -139,6 +139,9 @@ func (r *Result) ComputeSummary(execTime time.Duration) {
int64(float64(r.Summary.TotalLinesProcessed) /
execTime.Seconds())
}
if queueTime != 0 {
r.Summary.QueueTime = int64(queueTime)
}
}
func (s *Store) Merge(m Store) {
@ -168,7 +171,8 @@ func (i *Ingester) Merge(m Ingester) {
func (r *Result) Merge(m Result) {
r.Querier.Merge(m.Querier)
r.Ingester.Merge(m.Ingester)
r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime + m.Summary.ExecTime) * float64(time.Second))))
r.ComputeSummary(time.Duration(int64((r.Summary.ExecTime+m.Summary.ExecTime)*float64(time.Second))),
time.Duration(r.Summary.QueueTime+m.Summary.QueueTime))
}
func (r Result) ChunksDownloadTime() time.Duration {
@ -281,5 +285,6 @@ func (s Summary) Log(log log.Logger) {
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)),
"Summary.TotalLinesProcessed", s.TotalLinesProcessed,
"Summary.ExecTime", time.Duration(int64(s.ExecTime*float64(time.Second))),
"Summary.QueueTime", time.Duration(s.QueueTime),
)
}

@ -25,7 +25,7 @@ func TestResult(t *testing.T) {
fakeIngesterQuery(ctx)
fakeIngesterQuery(ctx)
res := stats.Result(2 * time.Second)
res := stats.Result(2*time.Second, 2*time.Nanosecond)
res.Log(util_log.Logger)
expected := Result{
Ingester: Ingester{
@ -61,6 +61,7 @@ func TestResult(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
QueueTime: 2 * time.Nanosecond.Nanoseconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
@ -106,6 +107,7 @@ func TestSnapshot_JoinResults(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
QueueTime: 2 * time.Nanosecond.Nanoseconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
@ -114,7 +116,7 @@ func TestSnapshot_JoinResults(t *testing.T) {
}
JoinResults(ctx, expected)
res := statsCtx.Result(2 * time.Second)
res := statsCtx.Result(2*time.Second, 2*time.Nanosecond)
require.Equal(t, expected, res)
}
@ -177,6 +179,7 @@ func TestResult_Merge(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
QueueTime: 2 * time.Nanosecond.Nanoseconds(),
BytesProcessedPerSecond: int64(42),
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: int64(84),
@ -223,6 +226,7 @@ func TestResult_Merge(t *testing.T) {
},
Summary: Summary{
ExecTime: 2 * 2 * time.Second.Seconds(),
QueueTime: 2 * 2 * time.Nanosecond.Nanoseconds(),
BytesProcessedPerSecond: int64(42), // 2 requests at the same pace should give the same bytes/lines per sec
LinesProcessedPerSecond: int64(50),
TotalBytesProcessed: 2 * int64(84),
@ -234,10 +238,10 @@ func TestResult_Merge(t *testing.T) {
func TestReset(t *testing.T) {
statsCtx, ctx := NewContext(context.Background())
fakeIngesterQuery(ctx)
res := statsCtx.Result(2 * time.Second)
res := statsCtx.Result(2*time.Second, 2*time.Millisecond)
require.NotEmpty(t, res)
statsCtx.Reset()
res = statsCtx.Result(0)
res = statsCtx.Result(0, 0)
require.Empty(t, res)
}

@ -98,6 +98,8 @@ type Summary struct {
TotalLinesProcessed int64 `protobuf:"varint,4,opt,name=totalLinesProcessed,proto3" json:"totalLinesProcessed"`
// Execution time in seconds.
ExecTime float64 `protobuf:"fixed64,5,opt,name=execTime,proto3" json:"execTime"`
// Queue time in nanoseconds.
QueueTime int64 `protobuf:"varint,6,opt,name=queueTime,proto3" json:"queueTime"`
}
func (m *Summary) Reset() { *m = Summary{} }
@ -167,6 +169,13 @@ func (m *Summary) GetExecTime() float64 {
return 0
}
func (m *Summary) GetQueueTime() int64 {
if m != nil {
return m.QueueTime
}
return 0
}
type Querier struct {
Store Store `protobuf:"bytes,1,opt,name=store,proto3" json:"store"`
}
@ -460,51 +469,52 @@ func init() {
func init() { proto.RegisterFile("pkg/logqlmodel/stats/stats.proto", fileDescriptor_6cdfe5d2aea33ebb) }
var fileDescriptor_6cdfe5d2aea33ebb = []byte{
// 693 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xbf, 0x6f, 0xd3, 0x40,
0x14, 0xb6, 0x13, 0xdc, 0x84, 0xa3, 0xb4, 0xe5, 0xaa, 0xd2, 0x00, 0x92, 0x5d, 0x65, 0xea, 0x00,
0x8d, 0xf8, 0xb1, 0x80, 0xe8, 0xe2, 0x56, 0x48, 0x95, 0x40, 0x94, 0x57, 0x58, 0xd8, 0x1c, 0xe7,
0x9a, 0x58, 0x75, 0x7c, 0xa9, 0x7f, 0x08, 0xba, 0xb1, 0x31, 0xc2, 0x9f, 0xc1, 0xc2, 0x9f, 0xc0,
0xde, 0xb1, 0x63, 0x27, 0x8b, 0xba, 0x0b, 0xf2, 0xd4, 0x8d, 0x15, 0xf9, 0x9d, 0x63, 0xd7, 0x17,
0x47, 0x62, 0xb1, 0xef, 0x7d, 0xdf, 0xfb, 0xde, 0x3b, 0xbf, 0xef, 0xac, 0x23, 0x1b, 0x93, 0xa3,
0x61, 0xcf, 0xe5, 0xc3, 0x63, 0x77, 0xcc, 0x07, 0xcc, 0xed, 0x05, 0xa1, 0x15, 0x06, 0xe2, 0xb9,
0x35, 0xf1, 0x79, 0xc8, 0xa9, 0x86, 0xc1, 0xfd, 0x47, 0x43, 0x27, 0x1c, 0x45, 0xfd, 0x2d, 0x9b,
0x8f, 0x7b, 0x43, 0x3e, 0xe4, 0x3d, 0x64, 0xfb, 0xd1, 0x21, 0x46, 0x18, 0xe0, 0x4a, 0xa8, 0xba,
0xbf, 0x54, 0xb2, 0x00, 0x2c, 0x88, 0xdc, 0x90, 0x3e, 0x27, 0xad, 0x20, 0x1a, 0x8f, 0x2d, 0xff,
0xa4, 0xa3, 0x6e, 0xa8, 0x9b, 0xb7, 0x9e, 0x2c, 0x6d, 0x89, 0xfa, 0x07, 0x02, 0x35, 0x97, 0x4f,
0x63, 0x43, 0x49, 0x63, 0x63, 0x9a, 0x06, 0xd3, 0x45, 0x26, 0x3d, 0x8e, 0x98, 0xef, 0x30, 0xbf,
0xd3, 0xa8, 0x48, 0xdf, 0x09, 0xb4, 0x94, 0xe6, 0x69, 0x30, 0x5d, 0xd0, 0x6d, 0xd2, 0x76, 0xbc,
0x21, 0x0b, 0x42, 0xe6, 0x77, 0x9a, 0xa8, 0x5d, 0xce, 0xb5, 0x7b, 0x39, 0x6c, 0xae, 0xe4, 0xe2,
0x22, 0x11, 0x8a, 0x55, 0xf7, 0x6f, 0x83, 0xb4, 0xf2, 0xfd, 0xd1, 0x0f, 0x64, 0xbd, 0x7f, 0x12,
0xb2, 0x60, 0xdf, 0xe7, 0x36, 0x0b, 0x02, 0x36, 0xd8, 0x67, 0xfe, 0x01, 0xb3, 0xb9, 0x37, 0xc0,
0x0f, 0x6a, 0x9a, 0x0f, 0xd2, 0xd8, 0x98, 0x97, 0x02, 0xf3, 0x88, 0xac, 0xac, 0xeb, 0x78, 0xb5,
0x65, 0x1b, 0x65, 0xd9, 0x39, 0x29, 0x30, 0x8f, 0xa0, 0x7b, 0x64, 0x35, 0xe4, 0xa1, 0xe5, 0x9a,
0x95, 0xb6, 0x38, 0x83, 0xa6, 0xb9, 0x9e, 0xc6, 0x46, 0x1d, 0x0d, 0x75, 0x60, 0x51, 0xea, 0x75,
0xa5, 0x55, 0xe7, 0x86, 0x54, 0xaa, 0x4a, 0x43, 0x1d, 0x48, 0x37, 0x49, 0x9b, 0x7d, 0x66, 0xf6,
0x7b, 0x67, 0xcc, 0x3a, 0xda, 0x86, 0xba, 0xa9, 0x9a, 0x8b, 0xd9, 0xe4, 0xa7, 0x18, 0x14, 0xab,
0xee, 0x4b, 0xd2, 0xca, 0xdd, 0xa5, 0x8f, 0x89, 0x16, 0x84, 0xdc, 0x67, 0xf9, 0xb9, 0x59, 0x9c,
0x9e, 0x9b, 0x0c, 0x33, 0x6f, 0xe7, 0xee, 0x89, 0x14, 0x10, 0xaf, 0xee, 0xcf, 0x06, 0x69, 0x4f,
0x0d, 0xa6, 0xcf, 0xc8, 0x22, 0xee, 0x05, 0x98, 0x65, 0x8f, 0x98, 0x70, 0x4b, 0x33, 0x57, 0xd2,
0xd8, 0xa8, 0xe0, 0x50, 0x89, 0xe8, 0x2b, 0x42, 0x31, 0xde, 0x19, 0x45, 0xde, 0x51, 0xf0, 0xc6,
0x0a, 0x51, 0x2b, 0x2c, 0xb9, 0x9b, 0xc6, 0x46, 0x0d, 0x0b, 0x35, 0x58, 0xd1, 0xdd, 0xc4, 0x38,
0xc8, 0x1d, 0x28, 0xbb, 0xe7, 0x38, 0x54, 0x22, 0xfa, 0x82, 0x2c, 0x95, 0xf3, 0x3b, 0x60, 0x5e,
0x98, 0x8f, 0x9b, 0xa6, 0xb1, 0x21, 0x31, 0x20, 0xc5, 0xe5, 0xbc, 0xb4, 0xff, 0x9e, 0xd7, 0xb7,
0x06, 0xd1, 0x90, 0x2f, 0x1a, 0x8b, 0x8f, 0x00, 0x76, 0x98, 0x1f, 0xee, 0xb2, 0x71, 0xc1, 0x80,
0x14, 0xd3, 0xb7, 0x64, 0xed, 0x1a, 0xb2, 0xcb, 0x3f, 0x79, 0x2e, 0xb7, 0x06, 0xc5, 0xd4, 0xee,
0xa5, 0xb1, 0x51, 0x9f, 0x00, 0xf5, 0x70, 0xe6, 0x81, 0x5d, 0xc1, 0xf0, 0xe0, 0x34, 0x4b, 0x0f,
0x66, 0x59, 0xa8, 0xc1, 0xb2, 0x89, 0x20, 0x8a, 0x43, 0x2c, 0x27, 0x82, 0xfd, 0xca, 0x89, 0x60,
0x0a, 0x88, 0x57, 0xf7, 0x6b, 0x93, 0x68, 0xc8, 0x67, 0x13, 0x19, 0x31, 0x6b, 0x20, 0x92, 0xb3,
0x3f, 0xe3, 0xba, 0x15, 0x55, 0x06, 0xa4, 0xb8, 0xa2, 0x45, 0x83, 0xd0, 0x13, 0x59, 0x8b, 0x0c,
0x48, 0x31, 0xdd, 0x21, 0x77, 0x06, 0xcc, 0xe6, 0xe3, 0x89, 0x8f, 0xff, 0x8e, 0x68, 0xbd, 0x80,
0xf2, 0xb5, 0x34, 0x36, 0x66, 0x49, 0x98, 0x85, 0xe4, 0x22, 0x62, 0x0f, 0xad, 0xfa, 0x22, 0x62,
0x1b, 0xb3, 0x10, 0xdd, 0x26, 0xcb, 0xf2, 0x3e, 0xda, 0x58, 0x62, 0x35, 0x8d, 0x0d, 0x99, 0x02,
0x19, 0xc8, 0xe4, 0x68, 0xef, 0x6e, 0x34, 0x71, 0x1d, 0xdb, 0xca, 0xe4, 0x37, 0x4b, 0xb9, 0x44,
0x81, 0x0c, 0x98, 0xfd, 0xb3, 0x0b, 0x5d, 0x39, 0xbf, 0xd0, 0x95, 0xab, 0x0b, 0x5d, 0xfd, 0x92,
0xe8, 0xea, 0x8f, 0x44, 0x57, 0x4f, 0x13, 0x5d, 0x3d, 0x4b, 0x74, 0xf5, 0x77, 0xa2, 0xab, 0x7f,
0x12, 0x5d, 0xb9, 0x4a, 0x74, 0xf5, 0xfb, 0xa5, 0xae, 0x9c, 0x5d, 0xea, 0xca, 0xf9, 0xa5, 0xae,
0x7c, 0x7c, 0x78, 0xfd, 0xa2, 0xf2, 0xad, 0x43, 0xcb, 0xb3, 0x7a, 0x2e, 0x3f, 0x72, 0x7a, 0x75,
0x37, 0x5d, 0x7f, 0x01, 0xaf, 0xab, 0xa7, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x2e, 0xbd, 0xac,
0xdd, 0x08, 0x07, 0x00, 0x00,
// 717 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0x4d, 0x6f, 0xd3, 0x4a,
0x14, 0xb5, 0x93, 0xe7, 0x26, 0x9d, 0xd7, 0xaf, 0x37, 0x55, 0x5f, 0xf3, 0x1e, 0x92, 0x5d, 0x65,
0x55, 0x09, 0x68, 0xc4, 0xc7, 0x06, 0x44, 0x37, 0x6e, 0x85, 0x54, 0x09, 0x44, 0xb9, 0x85, 0x0d,
0x3b, 0xc7, 0x99, 0x26, 0x56, 0x1d, 0x4f, 0xea, 0x0f, 0x41, 0x77, 0xec, 0x58, 0xc2, 0x8f, 0x60,
0xc1, 0x86, 0x9f, 0xc0, 0xbe, 0xcb, 0x2e, 0xbb, 0xb2, 0xa8, 0xbb, 0x41, 0x5e, 0xf5, 0x27, 0x20,
0xdf, 0x71, 0xec, 0x7a, 0xe2, 0x48, 0x6c, 0x92, 0xb9, 0xe7, 0xdc, 0x73, 0xef, 0xf8, 0x9e, 0x19,
0x0d, 0xd9, 0x9a, 0x9c, 0x0c, 0x7b, 0x2e, 0x1f, 0x9e, 0xba, 0x63, 0x3e, 0x60, 0x6e, 0x2f, 0x08,
0xad, 0x30, 0x10, 0xbf, 0x3b, 0x13, 0x9f, 0x87, 0x9c, 0x6a, 0x18, 0xfc, 0x7f, 0x7f, 0xe8, 0x84,
0xa3, 0xa8, 0xbf, 0x63, 0xf3, 0x71, 0x6f, 0xc8, 0x87, 0xbc, 0x87, 0x6c, 0x3f, 0x3a, 0xc6, 0x08,
0x03, 0x5c, 0x09, 0x55, 0xf7, 0x87, 0x4a, 0x16, 0x80, 0x05, 0x91, 0x1b, 0xd2, 0x27, 0xa4, 0x15,
0x44, 0xe3, 0xb1, 0xe5, 0x9f, 0x75, 0xd4, 0x2d, 0x75, 0xfb, 0xef, 0x87, 0x2b, 0x3b, 0xa2, 0xfe,
0x91, 0x40, 0xcd, 0xd5, 0xf3, 0xd8, 0x50, 0xd2, 0xd8, 0x98, 0xa6, 0xc1, 0x74, 0x91, 0x49, 0x4f,
0x23, 0xe6, 0x3b, 0xcc, 0xef, 0x34, 0x2a, 0xd2, 0xd7, 0x02, 0x2d, 0xa5, 0x79, 0x1a, 0x4c, 0x17,
0x74, 0x97, 0xb4, 0x1d, 0x6f, 0xc8, 0x82, 0x90, 0xf9, 0x9d, 0x26, 0x6a, 0x57, 0x73, 0xed, 0x41,
0x0e, 0x9b, 0x6b, 0xb9, 0xb8, 0x48, 0x84, 0x62, 0xd5, 0xfd, 0xda, 0x24, 0xad, 0x7c, 0x7f, 0xf4,
0x2d, 0xd9, 0xec, 0x9f, 0x85, 0x2c, 0x38, 0xf4, 0xb9, 0xcd, 0x82, 0x80, 0x0d, 0x0e, 0x99, 0x7f,
0xc4, 0x6c, 0xee, 0x0d, 0xf0, 0x83, 0x9a, 0xe6, 0x9d, 0x34, 0x36, 0xe6, 0xa5, 0xc0, 0x3c, 0x22,
0x2b, 0xeb, 0x3a, 0x5e, 0x6d, 0xd9, 0x46, 0x59, 0x76, 0x4e, 0x0a, 0xcc, 0x23, 0xe8, 0x01, 0x59,
0x0f, 0x79, 0x68, 0xb9, 0x66, 0xa5, 0x2d, 0xce, 0xa0, 0x69, 0x6e, 0xa6, 0xb1, 0x51, 0x47, 0x43,
0x1d, 0x58, 0x94, 0x7a, 0x51, 0x69, 0xd5, 0xf9, 0x4b, 0x2a, 0x55, 0xa5, 0xa1, 0x0e, 0xa4, 0xdb,
0xa4, 0xcd, 0x3e, 0x30, 0xfb, 0x8d, 0x33, 0x66, 0x1d, 0x6d, 0x4b, 0xdd, 0x56, 0xcd, 0xa5, 0x6c,
0xf2, 0x53, 0x0c, 0x8a, 0x15, 0xbd, 0x4b, 0x16, 0x4f, 0x23, 0x16, 0x31, 0x4c, 0x5d, 0xc0, 0x56,
0xcb, 0x69, 0x6c, 0x94, 0x20, 0x94, 0xcb, 0xee, 0x33, 0xd2, 0xca, 0x8f, 0x02, 0x7d, 0x40, 0xb4,
0x20, 0xe4, 0x3e, 0xcb, 0x0f, 0xd9, 0xd2, 0xf4, 0x90, 0x65, 0x98, 0xb9, 0x9c, 0x5b, 0x2d, 0x52,
0x40, 0xfc, 0x75, 0xbf, 0x37, 0x48, 0x7b, 0x7a, 0x1a, 0xe8, 0x63, 0xb2, 0x84, 0x1b, 0x07, 0x66,
0xd9, 0x23, 0x26, 0xac, 0xd5, 0xcc, 0xb5, 0x34, 0x36, 0x2a, 0x38, 0x54, 0x22, 0xfa, 0x9c, 0x50,
0x8c, 0xf7, 0x46, 0x91, 0x77, 0x12, 0xbc, 0xb4, 0x42, 0xd4, 0x0a, 0xff, 0xfe, 0x4d, 0x63, 0xa3,
0x86, 0x85, 0x1a, 0xac, 0xe8, 0x6e, 0x62, 0x1c, 0xe4, 0x76, 0x95, 0xdd, 0x73, 0x1c, 0x2a, 0x11,
0x7d, 0x4a, 0x56, 0xca, 0x61, 0x1f, 0x31, 0x2f, 0xcc, 0xbd, 0xa1, 0x69, 0x6c, 0x48, 0x0c, 0x48,
0x71, 0x39, 0x2f, 0xed, 0x8f, 0xe7, 0xf5, 0xb9, 0x41, 0x34, 0xe4, 0x8b, 0xc6, 0xe2, 0x23, 0x80,
0x1d, 0xe7, 0x37, 0xa1, 0x6c, 0x5c, 0x30, 0x20, 0xc5, 0xf4, 0x15, 0xd9, 0xb8, 0x85, 0xec, 0xf3,
0xf7, 0x9e, 0xcb, 0xad, 0x41, 0x31, 0xb5, 0xff, 0xd2, 0xd8, 0xa8, 0x4f, 0x80, 0x7a, 0x38, 0xf3,
0xc0, 0xae, 0x60, 0x78, 0x74, 0x9a, 0xa5, 0x07, 0xb3, 0x2c, 0xd4, 0x60, 0xd9, 0x44, 0x10, 0xc5,
0x21, 0x96, 0x13, 0xc1, 0x7e, 0xe5, 0x44, 0x30, 0x05, 0xc4, 0x5f, 0xf7, 0x53, 0x93, 0x68, 0xc8,
0x67, 0x13, 0x19, 0x31, 0x6b, 0x20, 0x92, 0xb3, 0x6b, 0x74, 0xdb, 0x8a, 0x2a, 0x03, 0x52, 0x5c,
0xd1, 0xa2, 0x41, 0xe8, 0x89, 0xac, 0x45, 0x06, 0xa4, 0x98, 0xee, 0x91, 0x7f, 0x06, 0xcc, 0xe6,
0xe3, 0x89, 0x8f, 0x17, 0x4d, 0xb4, 0x16, 0xd7, 0x66, 0x23, 0x8d, 0x8d, 0x59, 0x12, 0x66, 0x21,
0xb9, 0x88, 0xd8, 0x43, 0xab, 0xbe, 0x88, 0xd8, 0xc6, 0x2c, 0x44, 0x77, 0xc9, 0xaa, 0xbc, 0x8f,
0x36, 0x96, 0x58, 0x4f, 0x63, 0x43, 0xa6, 0x40, 0x06, 0x32, 0x39, 0xda, 0xbb, 0x1f, 0x4d, 0x5c,
0xc7, 0xb6, 0x32, 0xf9, 0x62, 0x29, 0x97, 0x28, 0x90, 0x01, 0xb3, 0x7f, 0x71, 0xa5, 0x2b, 0x97,
0x57, 0xba, 0x72, 0x73, 0xa5, 0xab, 0x1f, 0x13, 0x5d, 0xfd, 0x96, 0xe8, 0xea, 0x79, 0xa2, 0xab,
0x17, 0x89, 0xae, 0xfe, 0x4c, 0x74, 0xf5, 0x57, 0xa2, 0x2b, 0x37, 0x89, 0xae, 0x7e, 0xb9, 0xd6,
0x95, 0x8b, 0x6b, 0x5d, 0xb9, 0xbc, 0xd6, 0x95, 0x77, 0xf7, 0x6e, 0xbf, 0x6a, 0xbe, 0x75, 0x6c,
0x79, 0x56, 0xcf, 0xe5, 0x27, 0x4e, 0xaf, 0xee, 0x59, 0xec, 0x2f, 0xe0, 0xdb, 0xf6, 0xe8, 0x77,
0x00, 0x00, 0x00, 0xff, 0xff, 0xa9, 0x02, 0x9d, 0x66, 0x35, 0x07, 0x00, 0x00,
}
func (this *Result) Equal(that interface{}) bool {
@ -571,6 +581,9 @@ func (this *Summary) Equal(that interface{}) bool {
if this.ExecTime != that1.ExecTime {
return false
}
if this.QueueTime != that1.QueueTime {
return false
}
return true
}
func (this *Querier) Equal(that interface{}) bool {
@ -721,13 +734,14 @@ func (this *Summary) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 9)
s := make([]string, 0, 10)
s = append(s, "&stats.Summary{")
s = append(s, "BytesProcessedPerSecond: "+fmt.Sprintf("%#v", this.BytesProcessedPerSecond)+",\n")
s = append(s, "LinesProcessedPerSecond: "+fmt.Sprintf("%#v", this.LinesProcessedPerSecond)+",\n")
s = append(s, "TotalBytesProcessed: "+fmt.Sprintf("%#v", this.TotalBytesProcessed)+",\n")
s = append(s, "TotalLinesProcessed: "+fmt.Sprintf("%#v", this.TotalLinesProcessed)+",\n")
s = append(s, "ExecTime: "+fmt.Sprintf("%#v", this.ExecTime)+",\n")
s = append(s, "QueueTime: "+fmt.Sprintf("%#v", this.QueueTime)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -864,6 +878,11 @@ func (m *Summary) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.QueueTime != 0 {
i = encodeVarintStats(dAtA, i, uint64(m.QueueTime))
i--
dAtA[i] = 0x30
}
if m.ExecTime != 0 {
i -= 8
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.ExecTime))))
@ -1127,6 +1146,9 @@ func (m *Summary) Size() (n int) {
if m.ExecTime != 0 {
n += 9
}
if m.QueueTime != 0 {
n += 1 + sovStats(uint64(m.QueueTime))
}
return n
}
@ -1239,6 +1261,7 @@ func (this *Summary) String() string {
`TotalBytesProcessed:` + fmt.Sprintf("%v", this.TotalBytesProcessed) + `,`,
`TotalLinesProcessed:` + fmt.Sprintf("%v", this.TotalLinesProcessed) + `,`,
`ExecTime:` + fmt.Sprintf("%v", this.ExecTime) + `,`,
`QueueTime:` + fmt.Sprintf("%v", this.QueueTime) + `,`,
`}`,
}, "")
return s
@ -1571,6 +1594,25 @@ func (m *Summary) Unmarshal(dAtA []byte) error {
v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:]))
iNdEx += 8
m.ExecTime = float64(math.Float64frombits(v))
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field QueueTime", wireType)
}
m.QueueTime = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStats
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.QueueTime |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipStats(dAtA[iNdEx:])

@ -28,6 +28,8 @@ message Summary {
int64 totalLinesProcessed = 4 [(gogoproto.jsontag) = "totalLinesProcessed"];
// Execution time in seconds.
double execTime = 5 [(gogoproto.jsontag) = "execTime"];
// Queue time in nanoseconds.
int64 queueTime = 6 [(gogoproto.jsontag) = "queueTime"];
}
message Querier {

@ -236,15 +236,19 @@ func (t *Loki) initQuerier() (services.Service, error) {
SchedulerRing: scheduler.SafeReadRing(t.queryScheduler),
}
httpMiddleware := middleware.Merge(
httpreq.ExtractQueryMetricsMiddleware(),
)
queryHandlers := map[string]http.Handler{
"/loki/api/v1/query_range": http.HandlerFunc(t.Querier.RangeQueryHandler),
"/loki/api/v1/query": http.HandlerFunc(t.Querier.InstantQueryHandler),
"/loki/api/v1/query_range": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.RangeQueryHandler)),
"/loki/api/v1/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.InstantQueryHandler)),
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),
"/api/prom/query": http.HandlerFunc(t.Querier.LogQueryHandler),
"/api/prom/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LogQueryHandler)),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),

@ -903,10 +903,11 @@ var (
},
"summary": {
"bytesProcessedPerSecond": 20,
"execTime": 21,
"linesProcessedPerSecond": 22,
"totalBytesProcessed": 23,
"totalLinesProcessed": 24
"queueTime": 21,
"execTime": 22,
"linesProcessedPerSecond": 23,
"totalBytesProcessed": 24,
"totalLinesProcessed": 25
}
},`
matrixString = `{
@ -1052,10 +1053,11 @@ var (
statsResult = stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 20,
ExecTime: 21,
LinesProcessedPerSecond: 22,
TotalBytesProcessed: 23,
TotalLinesProcessed: 24,
QueueTime: 21,
ExecTime: 22,
LinesProcessedPerSecond: 23,
TotalBytesProcessed: 24,
TotalLinesProcessed: 25,
},
Querier: stats.Querier{
Store: stats.Store{

@ -120,12 +120,12 @@ func TestResponseToResult(t *testing.T) {
}},
},
Statistics: stats.Result{
Summary: stats.Summary{ExecTime: 1},
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
},
},
expected: logqlmodel.Result{
Statistics: stats.Result{
Summary: stats.Summary{ExecTime: 1},
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
},
Data: logqlmodel.Streams{{
Labels: `{foo="bar"}`,
@ -144,7 +144,7 @@ func TestResponseToResult(t *testing.T) {
desc: "LokiPromResponse",
input: &LokiPromResponse{
Statistics: stats.Result{
Summary: stats.Summary{ExecTime: 1},
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
},
Response: &queryrange.PrometheusResponse{
Data: queryrange.PrometheusData{
@ -154,7 +154,7 @@ func TestResponseToResult(t *testing.T) {
},
expected: logqlmodel.Result{
Statistics: stats.Result{
Summary: stats.Summary{ExecTime: 1},
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
},
Data: sampleStreamToMatrix(testSampleStreams()),
},
@ -310,7 +310,7 @@ func TestInstanceDownstream(t *testing.T) {
}},
},
Statistics: stats.Result{
Summary: stats.Summary{ExecTime: 1},
Summary: stats.Summary{QueueTime: 1, ExecTime: 2},
},
}
}

@ -49,6 +49,7 @@ var emptyStats = `"stats": {
},
"summary": {
"bytesProcessedPerSecond": 0,
"queueTime": 0,
"execTime": 0,
"linesProcessedPerSecond": 0,
"totalBytesProcessed":0,

@ -102,8 +102,9 @@ func StatsCollectorMiddleware() queryrange.Middleware {
}
}
if statistics != nil {
// Re-calculate the summary then log and record metrics for the current query
statistics.ComputeSummary(time.Since(start))
// Re-calculate the summary: the queueTime result is already merged so should not be updated
// Log and record metrics for the current query
statistics.ComputeSummary(time.Since(start), 0)
statistics.Log(level.Debug(logger))
}
ctxValue := ctx.Value(ctxKey)

@ -5,6 +5,7 @@ import (
"flag"
"io"
"net/http"
"net/textproto"
"sync"
"time"
@ -37,6 +38,7 @@ import (
lokiutil "github.com/grafana/loki/pkg/util"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
lokihttpreq "github.com/grafana/loki/pkg/util/httpreq"
)
var (
@ -247,7 +249,7 @@ type schedulerRequest struct {
request *httpgrpc.HTTPRequest
statsEnabled bool
enqueueTime time.Time
queueTime time.Time
ctx context.Context
ctxCancel context.CancelFunc
@ -396,7 +398,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
req.parentSpanContext = parentSpanContext
req.queueSpan, req.ctx = opentracing.StartSpanFromContextWithTracer(ctx, tracer, "queued", opentracing.ChildOf(parentSpanContext))
req.enqueueTime = now
req.queueTime = now
req.ctxCancel = cancel
// aggregate the max queriers limit in the case of a multi tenant query
@ -462,9 +464,16 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
r := req.(*schedulerRequest)
s.queueDuration.Observe(time.Since(r.enqueueTime).Seconds())
reqQueueTime := time.Since(r.queueTime)
s.queueDuration.Observe(reqQueueTime.Seconds())
r.queueSpan.Finish()
// Add HTTP header to the request containing the query queue time
r.request.Headers = append(r.request.Headers, &httpgrpc.Header{
Key: textproto.CanonicalMIMEHeaderKey(string(lokihttpreq.QueryQueueTimeHTTPHeader)),
Values: []string{reqQueueTime.String()},
})
/*
We want to dequeue the next unexpired request from the chosen tenant queue.
The chance of choosing a particular tenant for dequeueing is (1/active_tenants).

@ -1665,7 +1665,7 @@ func Benchmark_store_OverlappingChunks(b *testing.B) {
b.Fatal(err)
}
}
r := statsCtx.Result(time.Since(start))
r := statsCtx.Result(time.Since(start), 0)
b.Log("Total chunks:" + fmt.Sprintf("%d", r.TotalChunksRef()))
b.Log("Total bytes decompressed:" + fmt.Sprintf("%d", r.TotalDecompressedBytes()))
}

@ -4,6 +4,7 @@ import (
"context"
"net/http"
"regexp"
"time"
"github.com/weaveworks/common/middleware"
)
@ -16,6 +17,7 @@ var (
QueryTagsHTTPHeader ctxKey = "X-Query-Tags"
safeQueryTags = regexp.MustCompile("[^a-zA-Z0-9-=, ]+") // only alpha-numeric, ' ', ',', '=' and `-`
QueryQueueTimeHTTPHeader ctxKey = "X-Query-Queue-Time"
)
func ExtractQueryTagsMiddleware() middleware.Interface {
@ -33,3 +35,22 @@ func ExtractQueryTagsMiddleware() middleware.Interface {
})
})
}
func ExtractQueryMetricsMiddleware() middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
queueTimeHeader := req.Header.Get(string(QueryQueueTimeHTTPHeader))
if queueTimeHeader != "" {
queueTime, err := time.ParseDuration(queueTimeHeader)
if err == nil {
ctx = context.WithValue(ctx, QueryQueueTimeHTTPHeader, queueTime)
req = req.WithContext(ctx)
}
}
next.ServeHTTP(w, req)
})
})
}

@ -4,6 +4,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -49,3 +50,44 @@ func TestQueryTags(t *testing.T) {
})
}
}
func TestQueryMetrics(t *testing.T) {
for _, tc := range []struct {
desc string
in string
exp interface{}
error bool
}{
{
desc: "valid time duration",
in: `2s`,
exp: 2 * time.Second,
},
{
desc: "empty header",
in: ``,
exp: nil,
},
{
desc: "invalid time duration",
in: `foo`,
exp: nil,
},
} {
t.Run(tc.desc, func(t *testing.T) {
req := httptest.NewRequest("GET", "http://testing.com", nil)
req.Header.Set(string(QueryQueueTimeHTTPHeader), tc.in)
w := httptest.NewRecorder()
checked := false
mware := ExtractQueryMetricsMiddleware().Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
require.Equal(t, tc.exp, req.Context().Value(QueryQueueTimeHTTPHeader))
checked = true
}))
mware.ServeHTTP(w, req)
assert.True(t, true, checked)
})
}
}

@ -80,6 +80,7 @@ var queryTests = []struct {
},
"summary": {
"bytesProcessedPerSecond": 0,
"queueTime": 0,
"execTime": 0,
"linesProcessedPerSecond": 0,
"totalBytesProcessed":0,

@ -86,6 +86,7 @@ var queryTests = []struct {
},
"summary": {
"bytesProcessedPerSecond": 0,
"queueTime": 0,
"execTime": 0,
"linesProcessedPerSecond": 0,
"totalBytesProcessed":0,
@ -193,6 +194,7 @@ var queryTests = []struct {
},
"summary": {
"bytesProcessedPerSecond": 0,
"queueTime": 0,
"execTime": 0,
"linesProcessedPerSecond": 0,
"totalBytesProcessed":0,
@ -317,6 +319,7 @@ var queryTests = []struct {
},
"summary": {
"bytesProcessedPerSecond": 0,
"queueTime": 0,
"execTime": 0,
"linesProcessedPerSecond": 0,
"totalBytesProcessed":0,

Loading…
Cancel
Save