inflight-logging: Add extra metadata to inflight requests logging (#11243)

**What this PR does / why we need it**:
logging: Add extra metadata to inflight requests

This adds extra metadata (similar to what we have in `metrics.go`) but
for queries in in-flight (both started and retrying)

Changes:
    Adds following data
    1. Query Hash
    2. Start and End time
    3. Start and End delta
    4. Length of the query
5. Moved the helper util to `queryutil` package because of cyclic
dependencies with `logql` package.
   
**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:
Find the screenshots of log entries looks like (both in `retry.go` and
`roundtrip.go`)

![Screenshot 2023-11-16 at 13 01
32](https://github.com/grafana/loki/assets/3735252/177e97ed-6ee8-41dd-b088-2e4f49562ba0)
![Screenshot 2023-11-16 at 13 02
15](https://github.com/grafana/loki/assets/3735252/fb328a37-dbe3-483e-b083-f21327858029)

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)

---------

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
pull/10932/head
Kaviraj Kanagaraj 2 years ago committed by GitHub
parent c716e498ad
commit 30d0030bf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      Makefile
  3. 3
      pkg/logql/blocker.go
  4. 5
      pkg/logql/blocker_test.go
  5. 2
      pkg/logql/engine.go
  6. 2
      pkg/logql/engine_test.go
  7. 18
      pkg/logql/metrics.go
  8. 7
      pkg/logql/metrics_test.go
  9. 20
      pkg/querier/queryrange/queryrangebase/retry.go
  10. 18
      pkg/querier/queryrange/roundtrip.go
  11. 8
      pkg/ruler/compat.go
  12. 4
      pkg/ruler/evaluator_jitter.go
  13. 4
      pkg/ruler/evaluator_remote.go
  14. 13
      pkg/util/hash_fp.go

@ -6,6 +6,7 @@
##### Enhancements
* [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging.
* [11110](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Change the default of the `metrics-namespace` flag to 'loki'.
* [11086](https://github.com/grafana/loki/pull/11086) **kandrew5**: Helm: Allow topologySpreadConstraints
* [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace.

@ -42,6 +42,8 @@ BUILD_IMAGE_VERSION ?= 0.31.2
# Docker image info
IMAGE_PREFIX ?= grafana
BUILD_IMAGE_PREFIX ?= grafana
IMAGE_TAG ?= $(shell ./tools/image-tag)
# Version info for binaries
@ -102,7 +104,7 @@ RM := --rm
TTY := --tty
DOCKER_BUILDKIT ?= 1
BUILD_IMAGE = BUILD_IMAGE=$(IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
BUILD_IMAGE = BUILD_IMAGE=$(BUILD_IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
PUSH_OCI=docker push
TAG_OCI=docker tag
ifeq ($(CI), true)

@ -8,6 +8,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
"github.com/grafana/loki/pkg/util"
logutil "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/validation"
)
@ -43,7 +44,7 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
for _, b := range blocks {
if b.Hash > 0 {
if b.Hash == HashedQuery(query) {
if b.Hash == util.HashedQuery(query) {
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query)
return qb.block(b, typ, logger)
}

@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
@ -124,7 +125,7 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
"correct FNV32 hash matches",
defaultQuery, []*validation.BlockedQuery{
{
Hash: HashedQuery(defaultQuery),
Hash: util.HashedQuery(defaultQuery),
},
}, logqlmodel.ErrBlocked,
},
@ -132,7 +133,7 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
"incorrect FNV32 hash does not match",
defaultQuery, []*validation.BlockedQuery{
{
Hash: HashedQuery(defaultQuery) + 1,
Hash: util.HashedQuery(defaultQuery) + 1,
},
}, nil,
},

@ -219,7 +219,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
)
if q.logExecQuery {
queryHash := HashedQuery(q.params.Query())
queryHash := util.HashedQuery(q.params.Query())
if GetRangeType(q.params) == InstantType {
level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash)
} else {

@ -2669,7 +2669,7 @@ func TestHashingStability(t *testing.T) {
{`sum (count_over_time({app="myapp",env="myenv"} |= "error" |= "metrics.go" | logfmt [10s])) by(query_hash)`},
} {
params.qs = test.qs
expectedQueryHash := HashedQuery(test.qs)
expectedQueryHash := util.HashedQuery(test.qs)
// check that both places will end up having the same query hash, even though they're emitting different log lines.
require.Regexp(t,

@ -2,7 +2,6 @@ package logql
import (
"context"
"hash/fnv"
"strconv"
"strings"
"time"
@ -19,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
@ -120,7 +120,7 @@ func RecordRangeAndInstantQueryMetrics(
logValues = append(logValues, []interface{}{
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_hash", HashedQuery(p.Query()),
"query_hash", util.HashedQuery(p.Query()),
"query_type", queryType,
"range_type", rt,
"length", p.End().Sub(p.Start()),
@ -187,12 +187,6 @@ func RecordRangeAndInstantQueryMetrics(
recordUsageStats(queryType, stats)
}
func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}
func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
@ -225,7 +219,7 @@ func RecordLabelQueryMetrics(
"status", status,
"label", label,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned,
)
@ -276,7 +270,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", PrintMatches(match),
"query_hash", HashedQuery(PrintMatches(match)),
"query_hash", util.HashedQuery(PrintMatches(match)),
"total_entries", stats.Summary.TotalEntriesReturned)
if shard != nil {
@ -316,7 +310,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)
level.Info(logger).Log(logValues...)
@ -346,7 +340,7 @@ func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),

@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -191,11 +192,11 @@ func Test_testToKeyValues(t *testing.T) {
}
func TestQueryHashing(t *testing.T) {
h1 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h2 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`)
h1 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h2 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`)
// check that it capture differences of order.
require.NotEqual(t, h1, h2)
h3 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
h3 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`)
// check that it evaluate same queries as same hashes, even if evaluated at different timestamps.
require.Equal(t, h1, h3)
}

@ -11,6 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -73,6 +74,11 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
MaxRetries: 0,
}
bk := backoff.New(ctx, cfg)
start := req.GetStart()
end := req.GetEnd()
query := req.GetQuery()
for ; tries < r.maxRetries; tries++ {
if ctx.Err() != nil {
return nil, ctx.Err()
@ -86,7 +92,19 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok || httpResp.Code/100 == 5 {
lastErr = err
level.Error(util_log.WithContext(ctx, r.log)).Log("msg", "error processing request", "try", tries, "query", req.GetQuery(), "retry_in", bk.NextDelay(), "err", err)
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
"try", tries,
"query", query,
"query_hash", util.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"retry_in", bk.NextDelay(),
"err", err,
)
bk.Wait()
continue
}

@ -24,6 +24,7 @@ import (
base "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
logutil "github.com/grafana/loki/pkg/util/log"
)
@ -247,8 +248,19 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryHash := logql.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "range", "query", op.Query, "length", op.EndTs.Sub(op.StartTs), "step", op.Step, "query_hash", queryHash)
queryHash := util.HashedQuery(op.Query)
level.Info(logger).Log(
"msg", "executing query",
"type", "range",
"query", op.Query,
"start", op.StartTs.Format(time.RFC3339Nano),
"end", op.EndTs.Format(time.RFC3339Nano),
"start_delta", time.Since(op.StartTs),
"end_delta", time.Since(op.EndTs),
"length", op.EndTs.Sub(op.StartTs),
"step", op.Step,
"query_hash", queryHash,
)
switch e := expr.(type) {
case syntax.SampleExpr:
@ -296,7 +308,7 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryHash := logql.HashedQuery(op.Query)
queryHash := util.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "instant", "query", op.Query, "query_hash", queryHash)
switch expr.(type) {

@ -24,11 +24,11 @@ import (
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/template"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/ruler/rulespb"
"github.com/grafana/loki/pkg/ruler/util"
rulerutil "github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/util"
)
// RulesLimits is the one function we need from limits.Overrides, and
@ -40,7 +40,7 @@ type RulesLimits interface {
RulerRemoteWriteURL(userID string) string
RulerRemoteWriteTimeout(userID string) time.Duration
RulerRemoteWriteHeaders(userID string) map[string]string
RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig
RulerRemoteWriteRelabelConfigs(userID string) []*rulerutil.RelabelConfig
RulerRemoteWriteConfig(userID string, id string) *config.RemoteWriteConfig
RulerRemoteWriteQueueCapacity(userID string) int
RulerRemoteWriteQueueMinShards(userID string) int
@ -60,7 +60,7 @@ type RulesLimits interface {
// and passing an altered timestamp.
func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
hash := logql.HashedQuery(qs)
hash := util.HashedQuery(qs)
detail := rules.FromOriginContext(ctx)
detailLog := log.With(logger, "rule_name", detail.Name, "rule_type", detail.Kind, "query", qs, "query_hash", hash)

@ -10,8 +10,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
)
// EvaluatorWithJitter wraps a given Evaluator. It applies a consistent jitter based on a rule's query string by hashing
@ -44,7 +44,7 @@ func NewEvaluatorWithJitter(inner Evaluator, maxJitter time.Duration, hasher has
}
func (e *EvaluatorWithJitter) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) {
logger := log.With(e.logger, "query", qs, "query_hash", logql.HashedQuery(qs))
logger := log.With(e.logger, "query", qs, "query_hash", util.HashedQuery(qs))
jitter := e.calculateJitter(qs, logger)
if jitter > 0 {

@ -36,8 +36,8 @@ import (
"google.golang.org/grpc/keepalive"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
@ -220,7 +220,7 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
args.Set("time", ts.Format(time.RFC3339Nano))
}
body := []byte(args.Encode())
hash := logql.HashedQuery(query)
hash := util.HashedQuery(query)
req := httpgrpc.HTTPRequest{
Method: http.MethodPost,

@ -1,6 +1,10 @@
package util
import "github.com/prometheus/common/model"
import (
"hash/fnv"
"github.com/prometheus/common/model"
)
// HashFP simply moves entropy from the most significant 48 bits of the
// fingerprint into the least significant 16 bits (by XORing) so that a simple
@ -12,3 +16,10 @@ import "github.com/prometheus/common/model"
func HashFP(fp model.Fingerprint) uint32 {
return uint32(fp ^ (fp >> 32) ^ (fp >> 16))
}
// HashedQuery returns a unique hash value for the given `query`.
func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}

Loading…
Cancel
Save