implements json stage metrics

pull/640/head
Cyril Tovena 6 years ago committed by Ed
parent ee0b502cc6
commit 6c7d5645c9
  1. 71
      pkg/logentry/counters.go
  2. 25
      pkg/logentry/metric/counters.go
  3. 5
      pkg/logentry/metric/counters_test.go
  4. 27
      pkg/logentry/metric/entries.go
  5. 25
      pkg/logentry/metric/gauges.go
  6. 26
      pkg/logentry/metric/histograms.go
  7. 46
      pkg/logentry/metric/metricvec.go
  8. 2
      pkg/logentry/stages/extensions.go
  9. 167
      pkg/logentry/stages/json.go
  10. 6
      pkg/logentry/stages/json_test.go
  11. 103
      pkg/logentry/stages/metrics.go
  12. 8
      pkg/logentry/stages/stage.go
  13. 4
      pkg/logentry/stages/util_test.go

@ -1,71 +0,0 @@
package logentry
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/util"
)
type counters struct {
name, help string
mtx sync.Mutex
counters map[model.Fingerprint]prometheus.Counter
}
func newCounters(name, help string) *counters {
return &counters{
counters: map[model.Fingerprint]prometheus.Counter{},
help: help,
name: name,
}
}
func (c *counters) Describe(ch chan<- *prometheus.Desc) {}
func (c *counters) Collect(ch chan<- prometheus.Metric) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, m := range c.counters {
ch <- m
}
}
func (c *counters) With(labels model.LabelSet) prometheus.Counter {
c.mtx.Lock()
defer c.mtx.Unlock()
fp := labels.Fingerprint()
var ok bool
var counter prometheus.Counter
if counter, ok = c.counters[fp]; !ok {
counter = prometheus.NewCounter(prometheus.CounterOpts{
Help: c.help,
Name: c.name,
ConstLabels: util.ModelLabelSetToMap(labels),
})
c.counters[fp] = counter
}
return counter
}
func logCount(reg prometheus.Registerer) api.EntryHandler {
c := newCounters("log_entries_total", "the total count of log entries")
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
c.With(labels).Inc()
return nil
})
}
func logSize(reg prometheus.Registerer) api.EntryHandler {
c := newCounters("log_entries_bytes", "the total count of bytes")
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
c.With(labels).Add(float64(len(entry)))
return nil
})
}

@ -0,0 +1,25 @@
package metric
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
type Counters struct {
*metricVec
}
func NewCounters(name, help string) *Counters {
return &Counters{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewCounter(prometheus.CounterOpts{
Help: help,
Name: name,
ConstLabels: labels,
})
})}
}
func (c *Counters) With(labels model.LabelSet) prometheus.Counter {
return c.metricVec.With(labels).(prometheus.Counter)
}

@ -1,4 +1,4 @@
package logentry
package metric
import (
"strings"
@ -22,9 +22,8 @@ log_entries_total{bar="foo",foo="bar"} 5.0
func Test_logCount(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
handler := logCount(reg)
handler := LogCount(reg)
workerCount := 5
var wg sync.WaitGroup

@ -0,0 +1,27 @@
package metric
import (
"time"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
func LogCount(reg prometheus.Registerer) api.EntryHandler {
c := NewCounters("log_entries_total", "the total count of log entries")
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
c.With(labels).Inc()
return nil
})
}
func LogSize(reg prometheus.Registerer) api.EntryHandler {
c := NewCounters("log_entries_bytes", "the total count of bytes")
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
c.With(labels).Add(float64(len(entry)))
return nil
})
}

@ -0,0 +1,25 @@
package metric
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
type Gauges struct {
*metricVec
}
func NewGauges(name, help string) *Gauges {
return &Gauges{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewGauge(prometheus.GaugeOpts{
Help: help,
Name: name,
ConstLabels: labels,
})
})}
}
func (g *Gauges) With(labels model.LabelSet) prometheus.Gauge {
return g.metricVec.With(labels).(prometheus.Gauge)
}

@ -0,0 +1,26 @@
package metric
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
type Histograms struct {
*metricVec
}
func NewHistograms(name, help string, buckets []float64) *Histograms {
return &Histograms{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewHistogram(prometheus.HistogramOpts{
Help: help,
Name: name,
ConstLabels: labels,
Buckets: buckets,
})
})}
}
func (h *Histograms) With(labels model.LabelSet) prometheus.Histogram {
return h.metricVec.With(labels).(prometheus.Histogram)
}

@ -0,0 +1,46 @@
package metric
import (
"sync"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
type metricVec struct {
factory func(labels map[string]string) prometheus.Metric
mtx sync.Mutex
metrics map[model.Fingerprint]prometheus.Metric
}
func newMetricVec(factory func(labels map[string]string) prometheus.Metric) *metricVec {
return &metricVec{
metrics: map[model.Fingerprint]prometheus.Metric{},
factory: factory,
}
}
func (c *metricVec) Describe(ch chan<- *prometheus.Desc) {}
func (c *metricVec) Collect(ch chan<- prometheus.Metric) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, m := range c.metrics {
ch <- m
}
}
func (c *metricVec) With(labels model.LabelSet) prometheus.Metric {
c.mtx.Lock()
defer c.mtx.Unlock()
fp := labels.Fingerprint()
var ok bool
var metric prometheus.Metric
if metric, ok = c.metrics[fp]; !ok {
metric = c.factory(util.ModelLabelSetToMap(labels))
c.metrics[fp] = metric
}
return metric
}

@ -20,7 +20,7 @@ func NewDocker(logger log.Logger) (Stage, error) {
"source": "log",
},
}
return NewJSON(logger, config)
return NewJSON(logger, config, nil)
}
// NewCRI creates a CRI format specific pipeline stage

@ -10,6 +10,7 @@ import (
"github.com/jmespath/go-jmespath"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
@ -34,6 +35,7 @@ type JSONConfig struct {
Timestamp *JSONTimestamp `mapstructure:"timestamp"`
Output *JSONOutput `mapstructure:"output"`
Labels map[string]*JSONLabel `mapstructure:"labels"`
Metrics MetricsConfig `mapstructure:"metrics"`
}
func newJSONConfig(config interface{}) (*JSONConfig, error) {
@ -90,6 +92,16 @@ func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) {
}
}
}
// metrics expressions.
for _, mcfg := range c.Metrics {
if mcfg.Source != nil {
expressions[*mcfg.Source], err = jmespath.Compile(*mcfg.Source)
if err != nil {
return nil, errors.Wrap(err, "could not compile output source jmespath expression")
}
}
}
return expressions, nil
}
@ -100,7 +112,7 @@ type jsonStage struct {
}
// NewJSON creates a new json stage from a config.
func NewJSON(logger log.Logger, config interface{}) (Stage, error) {
func NewJSON(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg, err := newJSONConfig(config)
if err != nil {
return nil, err
@ -109,67 +121,32 @@ func NewJSON(logger log.Logger, config interface{}) (Stage, error) {
if err != nil {
return nil, err
}
return &jsonStage{
return withMetric(&jsonStage{
cfg: cfg,
expressions: expressions,
logger: log.With(logger, "component", "parser", "type", "json"),
}, nil
}
func (j *jsonStage) getJSONString(expr *string, fallback string, data map[string]interface{}) (result string, ok bool) {
if expr == nil {
result, ok = data[fallback].(string)
if !ok {
level.Debug(j.logger).Log("msg", "field is not a string", "field", fallback)
}
} else {
var searchResult interface{}
searchResult, ok = j.getJSONValue(expr, data)
if !ok {
level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr)
return
}
result, ok = searchResult.(string)
if !ok {
level.Debug(j.logger).Log("msg", "search result is not a string", "expr", *expr)
}
}
return
}
func (j *jsonStage) getJSONValue(expr *string, data map[string]interface{}) (result interface{}, ok bool) {
var err error
ok = true
result, err = j.expressions[*expr].Search(data)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to search with jmespath expression", "expr", expr)
ok = false
return
}
return
}, cfg.Metrics, registerer), nil
}
// Process implement a pipeline stage
func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) Valuer {
if entry == nil {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
return
return nil
}
var data map[string]interface{}
if err := json.Unmarshal([]byte(*entry), &data); err != nil {
level.Debug(j.logger).Log("msg", "could not unmarshal json", "err", err)
return
valuer, err := newJSONValuer(*entry, j.expressions)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to create json valuer", "err", err)
return nil
}
// parsing ts
if j.cfg.Timestamp != nil {
if ts, ok := j.getJSONString(j.cfg.Timestamp.Source, "timestamp", data); ok {
parsedTs, err := time.Parse(j.cfg.Timestamp.Format, ts)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to parse time", "err", err, "format", j.cfg.Timestamp.Format, "value", ts)
} else {
*t = parsedTs
}
ts, err := parseTimestamp(valuer, j.cfg.Timestamp)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to parse timestamp", "err", err)
} else {
*t = ts
}
}
@ -179,8 +156,9 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string)
if lSrc != nil {
src = lSrc.Source
}
lValue, ok := j.getJSONString(src, lName, data)
if !ok {
lValue, err := valuer.getJSONString(src, lName)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to get json string", "err", err)
continue
}
labelValue := model.LabelValue(lValue)
@ -194,17 +172,82 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string)
// parsing output
if j.cfg.Output != nil {
if jsonObj, ok := j.getJSONValue(j.cfg.Output.Source, data); ok && jsonObj != nil {
if s, ok := jsonObj.(string); ok {
*entry = s
return
}
b, err := json.Marshal(jsonObj)
if err != nil {
level.Debug(j.logger).Log("msg", "could not marshal output value", "err", err)
return
}
*entry = string(b)
output, err := parseOutput(valuer, j.cfg.Output)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to parse output", "err", err)
} else {
*entry = output
}
}
return valuer
}
type jsonValuer struct {
exprmap map[string]*jmespath.JMESPath
data map[string]interface{}
}
func newJSONValuer(entry string, exprmap map[string]*jmespath.JMESPath) (*jsonValuer, error) {
var data map[string]interface{}
if err := json.Unmarshal([]byte(entry), &data); err != nil {
return nil, err
}
return &jsonValuer{
data: data,
exprmap: exprmap,
}, nil
}
func (v *jsonValuer) Value(expr *string) (interface{}, error) {
return v.exprmap[*expr].Search(v.data)
}
func (v *jsonValuer) getJSONString(expr *string, fallback string) (result string, err error) {
var ok bool
if expr == nil {
result, ok = v.data[fallback].(string)
if !ok {
return result, fmt.Errorf("%s is not a string but %T", fallback, v.data[fallback])
}
return
}
var searchResult interface{}
if searchResult, err = v.Value(expr); err != nil {
return
}
if result, ok = searchResult.(string); !ok {
return result, fmt.Errorf("%s is not a string but %T", *expr, searchResult)
}
return
}
func parseOutput(v Valuer, cfg *JSONOutput) (string, error) {
jsonObj, err := v.Value(cfg.Source)
if err != nil {
return "", errors.Wrap(err, "failed to fetch json value")
}
if jsonObj == nil {
return "", errors.New("json value is nil")
}
if s, ok := jsonObj.(string); ok {
return s, nil
}
b, err := json.Marshal(jsonObj)
if err != nil {
return "", errors.Wrap(err, "could not marshal output value")
}
return string(b), nil
}
func parseTimestamp(v *jsonValuer, cfg *JSONTimestamp) (time.Time, error) {
ts, err := v.getJSONString(cfg.Source, "timestamp")
if err != nil {
return time.Time{}, err
}
parsedTs, err := time.Parse(cfg.Format, ts)
if err != nil {
return time.Time{}, errors.Wrapf(err, "failed to parse time format:%s value:%s", cfg.Format, ts)
}
return parsedTs, nil
}

@ -54,10 +54,6 @@ func TestYamlMapStructure(t *testing.T) {
}
}
func String(s string) *string {
return &s
}
func TestJSONConfig_validate(t *testing.T) {
t.Parallel()
tests := map[string]struct {
@ -316,7 +312,7 @@ func TestJSONParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewJSON(util.Logger, tt.config)
p, err := NewJSON(util.Logger, tt.config, nil)
if err != nil {
t.Fatalf("failed to create json parser: %s", err)
}

@ -0,0 +1,103 @@
package stages
import (
"strings"
"time"
"github.com/grafana/loki/pkg/logentry/metric"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
const (
MetricTypeCounter = "counter"
MetricTypeGauge = "gauge"
MetricTypeHistogram = "histogram"
)
type MetricConfig struct {
MetricType string `mapstructure:"type"`
Description string `mapstructure:"description"`
Source *string `mapstructure:"source"`
Buckets []float64 `mapstructure:"buckets"`
}
type MetricsConfig map[string]MetricConfig
type Valuer interface {
Value(source *string) (interface{}, error)
}
type StageValuer interface {
Process(labels model.LabelSet, time *time.Time, entry *string) Valuer
}
func withMetric(s StageValuer, cfg MetricsConfig, registry prometheus.Registerer) Stage {
if registry == nil {
return StageFunc(func(labels model.LabelSet, time *time.Time, entry *string) {
_ = s.Process(labels, time, entry)
})
}
metricStage := newMetric(cfg, registry)
return StageFunc(func(labels model.LabelSet, time *time.Time, entry *string) {
valuer := s.Process(labels, time, entry)
if valuer != nil {
metricStage.process(valuer, labels)
}
})
}
func newMetric(cfgs MetricsConfig, registry prometheus.Registerer) *metricStage {
metrics := map[string]prometheus.Collector{}
for name, config := range cfgs {
var collector prometheus.Collector
switch strings.ToLower(config.MetricType) {
case MetricTypeCounter:
collector = metric.NewCounters(name, config.Description)
case MetricTypeGauge:
collector = metric.NewGauges(name, config.Description)
case MetricTypeHistogram:
collector = metric.NewHistograms(name, config.Description, config.Buckets)
}
if collector != nil {
registry.MustRegister(collector)
metrics[name] = collector
}
}
return &metricStage{
cfg: cfgs,
metrics: metrics,
}
}
type metricStage struct {
cfg MetricsConfig
metrics map[string]prometheus.Collector
}
func (m *metricStage) process(v Valuer, labels model.LabelSet) {
for name, collector := range m.metrics {
switch vec := collector.(type) {
case metric.Counters:
recordCounter(vec.With(labels), v, m.cfg[name])
case metric.Gauges:
recordGauge(vec.With(labels), v, m.cfg[name])
case metric.Histograms:
recordHistogram(vec.With(labels), v, m.cfg[name])
}
}
}
func recordCounter(counter prometheus.Counter, v Valuer, cfg MetricConfig) {
}
func recordGauge(counter prometheus.Gauge, v Valuer, cfg MetricConfig) {
}
func recordHistogram(counter prometheus.Histogram, v Valuer, cfg MetricConfig) {
}

@ -11,3 +11,11 @@ import (
type Stage interface {
Process(labels model.LabelSet, time *time.Time, entry *string)
}
// StageFunc is modelled on http.HandlerFunc.
type StageFunc func(labels model.LabelSet, time *time.Time, entry *string)
// Process implements EntryHandler.
func (s StageFunc) Process(labels model.LabelSet, time *time.Time, entry *string) {
s(labels, time, entry)
}

@ -37,3 +37,7 @@ func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) {
assert.Equal(t, model.LabelValue(v), gotV, "mismatch label value")
}
}
func String(s string) *string {
return &s
}

Loading…
Cancel
Save