From 22dcfcde33874bfc332af975791457c9375ca55d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 1 Jul 2021 12:53:47 +0200 Subject: [PATCH] Add retention label to loki_distributor_bytes_received_total metrics (#3840) * Add retention label on the loki_distributor_bytes_received_total metrics. Signed-off-by: Cyril Tovena * Update docs. Signed-off-by: Cyril Tovena * Update docs/sources/operations/observability.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> * Move ParseRequest to an apache2 package. Signed-off-by: Cyril Tovena * Add retention label on the loki_distributor_bytes_received_total metrics. Signed-off-by: Cyril Tovena * fixes bad merge. Signed-off-by: Cyril Tovena * Fixes. Signed-off-by: Cyril Tovena * Fixes bad merge. Signed-off-by: Cyril Tovena Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> --- .../promtail/targets/lokipush/pushtarget.go | 4 +- docs/sources/operations/observability.md | 2 +- pkg/distributor/distributor.go | 19 +-- pkg/distributor/http.go | 5 +- pkg/{util/parse.go => loghttp/push/push.go} | 108 ++++++++++-------- .../push/push_test.go} | 25 ++-- .../shipper/compactor/retention/expiration.go | 30 +++-- 7 files changed, 113 insertions(+), 80 deletions(-) rename pkg/{util/parse.go => loghttp/push/push.go} (56%) rename pkg/{util/parse_test.go => loghttp/push/push_test.go} (89%) diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index 38660f01a8..42eabf94f3 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -22,8 +22,8 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" "github.com/grafana/loki/clients/pkg/promtail/targets/target" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/util" ) type PushTarget struct { @@ -107,7 +107,7 @@ func (t *PushTarget) run() error { func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) userID, _ := user.ExtractOrgID(r.Context()) - req, err := util.ParseRequest(logger, userID, r) + req, err := push.ParseRequest(logger, userID, r, nil) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/docs/sources/operations/observability.md b/docs/sources/operations/observability.md index 4d4b63379e..19dd3a6d68 100644 --- a/docs/sources/operations/observability.md +++ b/docs/sources/operations/observability.md @@ -23,7 +23,7 @@ The Loki Distributors expose the following metrics: | ------------------------------------------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------ | | `loki_distributor_ingester_appends_total` | Counter | The total number of batch appends sent to ingesters. | | `loki_distributor_ingester_append_failures_total` | Counter | The total number of failed batch appends sent to ingesters. | -| `loki_distributor_bytes_received_total` | Counter | The total number of uncompressed bytes received per tenant. | +| `loki_distributor_bytes_received_total` | Counter | The total number of uncompressed bytes received per both tenant and retention hours. | | `loki_distributor_lines_received_total` | Counter | The total number of log _entries_ received per tenant (not necessarily of _lines_, as an entry can have more than one line of text). | The Loki Ingesters expose the following metrics: diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 53d8090d71..80b1f51350 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -27,13 +27,12 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/runtime" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/validation" ) -var ( - maxLabelCacheSize = 100000 -) +var maxLabelCacheSize = 100000 // Config for a Distributor. type Config struct { @@ -53,12 +52,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Distributor struct { services.Service - cfg Config - clientCfg client.Config - tenantConfigs *runtime.TenantConfigs - ingestersRing ring.ReadRing - validator *Validator - pool *ring_client.Pool + cfg Config + clientCfg client.Config + tenantConfigs *runtime.TenantConfigs + tenantsRetention *retention.TenantsRetention + ingestersRing ring.ReadRing + validator *Validator + pool *ring_client.Pool // The global rate limiter requires a distributors ring to count // the number of healthy instances. @@ -118,6 +118,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in cfg: cfg, clientCfg: clientCfg, tenantConfigs: configs, + tenantsRetention: retention.NewTenantsRetention(overrides), ingestersRing: ingestersRing, distributorsRing: distributorsRing, validator: validator, diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 19eb76f661..b3ced2aa7d 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -6,17 +6,16 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - - lokiutil "github.com/grafana/loki/pkg/util" ) // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) userID, _ := user.ExtractOrgID(r.Context()) - req, err := lokiutil.ParseRequest(logger, userID, r) + req, err := push.ParseRequest(logger, userID, r, d.tenantsRetention) if err != nil { if d.tenantConfigs.LogPushRequest(userID) { level.Debug(logger).Log( diff --git a/pkg/util/parse.go b/pkg/loghttp/push/push.go similarity index 56% rename from pkg/util/parse.go rename to pkg/loghttp/push/push.go index fb8f8b1d78..dcd0eceb64 100644 --- a/pkg/util/parse.go +++ b/pkg/loghttp/push/push.go @@ -1,4 +1,4 @@ -package util +package push import ( "compress/gzip" @@ -14,22 +14,24 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + loki_util "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/unmarshal" unmarshal2 "github.com/grafana/loki/pkg/util/unmarshal/legacy" ) var ( - contentType = http.CanonicalHeaderKey("Content-Type") - contentEnc = http.CanonicalHeaderKey("Content-Encoding") - + contentType = http.CanonicalHeaderKey("Content-Type") + contentEnc = http.CanonicalHeaderKey("Content-Encoding") bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "loki", Name: "distributor_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant", - }, []string{"tenant"}) + }, []string{"tenant", "retention_hours"}) linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "loki", Name: "distributor_lines_received_total", @@ -39,12 +41,15 @@ var ( const applicationJSON = "application/json" -func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) { +type TenantsRetention interface { + RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration +} +func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body - bodySize := NewSizeReader(r.Body) + bodySize := loki_util.NewSizeReader(r.Body) contentEncoding := r.Header.Get(contentEnc) switch contentEncoding { case "": @@ -66,48 +71,12 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto. } contentType := r.Header.Get(contentType) - var req logproto.PushRequest - - defer func() { - var ( - entriesSize int64 - streamLabelsSize int64 - totalEntries int64 - ) - - mostRecentEntry := time.Unix(0, 0) - - for _, s := range req.Streams { - streamLabelsSize += int64(len(s.Labels)) - for _, e := range s.Entries { - totalEntries++ - entriesSize += int64(len(e.Line)) - if e.Timestamp.After(mostRecentEntry) { - mostRecentEntry = e.Timestamp - } - } - } - - // incrementing tenant metrics if we have a tenant. - if totalEntries != 0 && userID != "" { - bytesIngested.WithLabelValues(userID).Add(float64(entriesSize)) - linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) - } - - level.Debug(logger).Log( - "msg", "push request parsed", - "path", r.URL.Path, - "contentType", contentType, - "contentEncoding", contentEncoding, - "bodySize", humanize.Bytes(uint64(bodySize.Size())), - "streams", len(req.Streams), - "entries", totalEntries, - "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)), - "entriesSize", humanize.Bytes(uint64(entriesSize)), - "totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)), - "mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(), - ) - }() + var ( + entriesSize int64 + streamLabelsSize int64 + totalEntries int64 + req logproto.PushRequest + ) switch contentType { case applicationJSON: @@ -132,5 +101,46 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto. return nil, err } } + + mostRecentEntry := time.Unix(0, 0) + + for _, s := range req.Streams { + streamLabelsSize += int64(len(s.Labels)) + var retentionHours string + if tenantsRetention != nil { + lbs, err := logql.ParseLabels(s.Labels) + if err != nil { + return nil, err + } + retentionHours = fmt.Sprintf("%d", int64(math.Floor(tenantsRetention.RetentionPeriodFor(userID, lbs).Hours()))) + } + for _, e := range s.Entries { + totalEntries++ + entriesSize += int64(len(e.Line)) + bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line)))) + if e.Timestamp.After(mostRecentEntry) { + mostRecentEntry = e.Timestamp + } + } + } + + // incrementing tenant metrics if we have a tenant. + if totalEntries != 0 && userID != "" { + linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) + } + + level.Debug(logger).Log( + "msg", "push request parsed", + "path", r.URL.Path, + "contentType", contentType, + "contentEncoding", contentEncoding, + "bodySize", humanize.Bytes(uint64(bodySize.Size())), + "streams", len(req.Streams), + "entries", totalEntries, + "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)), + "entriesSize", humanize.Bytes(uint64(entriesSize)), + "totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)), + "mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(), + ) return &req, nil } diff --git a/pkg/util/parse_test.go b/pkg/loghttp/push/push_test.go similarity index 89% rename from pkg/util/parse_test.go rename to pkg/loghttp/push/push_test.go index 8730e41f25..60e7ef67e8 100644 --- a/pkg/util/parse_test.go +++ b/pkg/loghttp/push/push_test.go @@ -1,4 +1,4 @@ -package util +package push import ( "bytes" @@ -26,7 +26,7 @@ func gzipString(source string) string { } func TestParseRequest(t *testing.T) { - var tests = []struct { + tests := []struct { path string body string contentType string @@ -37,35 +37,42 @@ func TestParseRequest(t *testing.T) { path: `/loki/api/v1/push`, body: ``, contentType: `application/json`, - valid: false}, + valid: false, + }, { path: `/loki/api/v1/push`, body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, contentType: ``, - valid: false}, + valid: false, + }, { path: `/loki/api/v1/push`, body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, contentType: `application/json`, - valid: true}, + valid: true, + }, { path: `/loki/api/v1/push`, body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, contentType: `application/json`, contentEncoding: ``, - valid: true}, + valid: true, + }, { path: `/loki/api/v1/push`, body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), contentType: `application/json`, contentEncoding: `gzip`, - valid: true}, + valid: true, + }, { path: `/loki/api/v1/push`, body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), contentType: `application/json`, contentEncoding: `snappy`, - valid: false}} + valid: false, + }, + } // Testing input array for index, test := range tests { @@ -76,7 +83,7 @@ func TestParseRequest(t *testing.T) { if len(test.contentEncoding) > 0 { request.Header.Add("Content-Encoding", test.contentEncoding) } - data, err := ParseRequest(util_log.Logger, "", request) + data, err := ParseRequest(util_log.Logger, "", request, nil) if test.valid { assert.Nil(t, err, "Should not give error for %d", index) assert.NotNil(t, data, "Should give data for %d", index) diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index ce4046fd18..1e2ccf87d5 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -4,6 +4,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/validation" ) @@ -13,7 +14,7 @@ type ExpirationChecker interface { } type expirationChecker struct { - limits Limits + tenantsRetention *TenantsRetention } type Limits interface { @@ -23,15 +24,30 @@ type Limits interface { func NewExpirationChecker(limits Limits) ExpirationChecker { return &expirationChecker{ - limits: limits, + tenantsRetention: NewTenantsRetention(limits), } } // Expired tells if a ref chunk is expired based on retention rules. func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) { userID := unsafeGetString(ref.UserID) - streamRetentions := e.limits.StreamRetention(userID) - globalRetention := e.limits.RetentionPeriod(userID) + period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels) + return now.Sub(ref.Through) > period, nil +} + +type TenantsRetention struct { + limits Limits +} + +func NewTenantsRetention(l Limits) *TenantsRetention { + return &TenantsRetention{ + limits: l, + } +} + +func (tr *TenantsRetention) RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration { + streamRetentions := tr.limits.StreamRetention(userID) + globalRetention := tr.limits.RetentionPeriod(userID) var ( matchedRule validation.StreamRetention found bool @@ -39,7 +55,7 @@ func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []mod Outer: for _, streamRetention := range streamRetentions { for _, m := range streamRetention.Matchers { - if !m.Matches(ref.Labels.Get(m.Name)) { + if !m.Matches(lbs.Get(m.Name)) { continue Outer } } @@ -58,7 +74,7 @@ Outer: matchedRule = streamRetention } if found { - return now.Sub(ref.Through) > time.Duration(matchedRule.Period), nil + return time.Duration(matchedRule.Period) } - return now.Sub(ref.Through) > globalRetention, nil + return globalRetention }