Add retention label to loki_distributor_bytes_received_total metrics (#3840)

* Add retention label on the loki_distributor_bytes_received_total metrics.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update docs.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update docs/sources/operations/observability.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* Move ParseRequest to an apache2 package.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add retention label on the loki_distributor_bytes_received_total metrics.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fixes bad merge.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes bad merge.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>
pull/3928/head
Cyril Tovena 5 years ago committed by GitHub
parent 5887096f3c
commit 22dcfcde33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      clients/pkg/promtail/targets/lokipush/pushtarget.go
  2. 2
      docs/sources/operations/observability.md
  3. 19
      pkg/distributor/distributor.go
  4. 5
      pkg/distributor/http.go
  5. 108
      pkg/loghttp/push/push.go
  6. 25
      pkg/loghttp/push/push_test.go
  7. 30
      pkg/storage/stores/shipper/compactor/retention/expiration.go

@ -22,8 +22,8 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
)
type PushTarget struct {
@ -107,7 +107,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
req, err := util.ParseRequest(logger, userID, r)
req, err := push.ParseRequest(logger, userID, r, nil)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)

@ -23,7 +23,7 @@ The Loki Distributors expose the following metrics:
| ------------------------------------------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------ |
| `loki_distributor_ingester_appends_total` | Counter | The total number of batch appends sent to ingesters. |
| `loki_distributor_ingester_append_failures_total` | Counter | The total number of failed batch appends sent to ingesters. |
| `loki_distributor_bytes_received_total` | Counter | The total number of uncompressed bytes received per tenant. |
| `loki_distributor_bytes_received_total` | Counter | The total number of uncompressed bytes received per both tenant and retention hours. |
| `loki_distributor_lines_received_total` | Counter | The total number of log _entries_ received per tenant (not necessarily of _lines_, as an entry can have more than one line of text). |
The Loki Ingesters expose the following metrics:

@ -27,13 +27,12 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/validation"
)
var (
maxLabelCacheSize = 100000
)
var maxLabelCacheSize = 100000
// Config for a Distributor.
type Config struct {
@ -53,12 +52,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Distributor struct {
services.Service
cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
tenantsRetention *retention.TenantsRetention
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
@ -118,6 +118,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
cfg: cfg,
clientCfg: clientCfg,
tenantConfigs: configs,
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
validator: validator,

@ -6,17 +6,16 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
lokiutil "github.com/grafana/loki/pkg/util"
)
// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
req, err := lokiutil.ParseRequest(logger, userID, r)
req, err := push.ParseRequest(logger, userID, r, d.tenantsRetention)
if err != nil {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(

@ -1,4 +1,4 @@
package util
package push
import (
"compress/gzip"
@ -14,22 +14,24 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
loki_util "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/unmarshal"
unmarshal2 "github.com/grafana/loki/pkg/util/unmarshal/legacy"
)
var (
contentType = http.CanonicalHeaderKey("Content-Type")
contentEnc = http.CanonicalHeaderKey("Content-Encoding")
contentType = http.CanonicalHeaderKey("Content-Type")
contentEnc = http.CanonicalHeaderKey("Content-Encoding")
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
}, []string{"tenant"})
}, []string{"tenant", "retention_hours"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
@ -39,12 +41,15 @@ var (
const applicationJSON = "application/json"
func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) {
type TenantsRetention interface {
RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
}
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
bodySize := NewSizeReader(r.Body)
bodySize := loki_util.NewSizeReader(r.Body)
contentEncoding := r.Header.Get(contentEnc)
switch contentEncoding {
case "":
@ -66,48 +71,12 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto.
}
contentType := r.Header.Get(contentType)
var req logproto.PushRequest
defer func() {
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
)
mostRecentEntry := time.Unix(0, 0)
for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
}
}
// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
bytesIngested.WithLabelValues(userID).Add(float64(entriesSize))
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}
level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"contentEncoding", contentEncoding,
"bodySize", humanize.Bytes(uint64(bodySize.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
"mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(),
)
}()
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
req logproto.PushRequest
)
switch contentType {
case applicationJSON:
@ -132,5 +101,46 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto.
return nil, err
}
}
mostRecentEntry := time.Unix(0, 0)
for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
var retentionHours string
if tenantsRetention != nil {
lbs, err := logql.ParseLabels(s.Labels)
if err != nil {
return nil, err
}
retentionHours = fmt.Sprintf("%d", int64(math.Floor(tenantsRetention.RetentionPeriodFor(userID, lbs).Hours())))
}
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line))))
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
}
}
// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}
level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"contentEncoding", contentEncoding,
"bodySize", humanize.Bytes(uint64(bodySize.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
"mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(),
)
return &req, nil
}

@ -1,4 +1,4 @@
package util
package push
import (
"bytes"
@ -26,7 +26,7 @@ func gzipString(source string) string {
}
func TestParseRequest(t *testing.T) {
var tests = []struct {
tests := []struct {
path string
body string
contentType string
@ -37,35 +37,42 @@ func TestParseRequest(t *testing.T) {
path: `/loki/api/v1/push`,
body: ``,
contentType: `application/json`,
valid: false},
valid: false,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: ``,
valid: false},
valid: false,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
valid: true},
valid: true,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
contentEncoding: ``,
valid: true},
valid: true,
},
{
path: `/loki/api/v1/push`,
body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json`,
contentEncoding: `gzip`,
valid: true},
valid: true,
},
{
path: `/loki/api/v1/push`,
body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json`,
contentEncoding: `snappy`,
valid: false}}
valid: false,
},
}
// Testing input array
for index, test := range tests {
@ -76,7 +83,7 @@ func TestParseRequest(t *testing.T) {
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}
data, err := ParseRequest(util_log.Logger, "", request)
data, err := ParseRequest(util_log.Logger, "", request, nil)
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)

@ -4,6 +4,7 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/validation"
)
@ -13,7 +14,7 @@ type ExpirationChecker interface {
}
type expirationChecker struct {
limits Limits
tenantsRetention *TenantsRetention
}
type Limits interface {
@ -23,15 +24,30 @@ type Limits interface {
func NewExpirationChecker(limits Limits) ExpirationChecker {
return &expirationChecker{
limits: limits,
tenantsRetention: NewTenantsRetention(limits),
}
}
// Expired tells if a ref chunk is expired based on retention rules.
func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) {
userID := unsafeGetString(ref.UserID)
streamRetentions := e.limits.StreamRetention(userID)
globalRetention := e.limits.RetentionPeriod(userID)
period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels)
return now.Sub(ref.Through) > period, nil
}
type TenantsRetention struct {
limits Limits
}
func NewTenantsRetention(l Limits) *TenantsRetention {
return &TenantsRetention{
limits: l,
}
}
func (tr *TenantsRetention) RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration {
streamRetentions := tr.limits.StreamRetention(userID)
globalRetention := tr.limits.RetentionPeriod(userID)
var (
matchedRule validation.StreamRetention
found bool
@ -39,7 +55,7 @@ func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []mod
Outer:
for _, streamRetention := range streamRetentions {
for _, m := range streamRetention.Matchers {
if !m.Matches(ref.Labels.Get(m.Name)) {
if !m.Matches(lbs.Get(m.Name)) {
continue Outer
}
}
@ -58,7 +74,7 @@ Outer:
matchedRule = streamRetention
}
if found {
return now.Sub(ref.Through) > time.Duration(matchedRule.Period), nil
return time.Duration(matchedRule.Period)
}
return now.Sub(ref.Through) > globalRetention, nil
return globalRetention
}

Loading…
Cancel
Save