[new] promtail: add readline rate limit (#5031)

* [new] promtail: add readline rate limit

* [new] promtail: add readline rate limit #5031

* [new] promtail: add readline rate limit #5031

* [new] promtail: add readline rate limit #5031

* [new] promtail: add readline rate limit #5031

* Update clients/pkg/promtail/limit/config.go

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update clients/pkg/promtail/limit/config.go

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4925/head
李国忠 3 years ago committed by GitHub
parent 9a7f5a22f6
commit 6b377b40d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      clients/cmd/promtail/promtail-local-limit-config.yaml
  2. 35
      clients/pkg/logentry/stages/pipeline.go
  3. 3
      clients/pkg/promtail/config/config.go
  4. 12
      clients/pkg/promtail/config/config_test.go
  5. 19
      clients/pkg/promtail/limit/config.go
  6. 4
      clients/pkg/promtail/promtail.go

@ -0,0 +1,22 @@
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://localhost:3100/loki/api/v1/push
scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
__path__: /var/log/*log
limit_config:
readline_rate: 100
readline_burst: 200

@ -1,11 +1,13 @@
package stages
import (
"context"
"sync"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"github.com/grafana/loki/clients/pkg/promtail/api"
)
@ -16,11 +18,16 @@ type PipelineStages = []interface{}
// PipelineStage contains configuration for a single pipeline stage
type PipelineStage = map[interface{}]interface{}
var rateLimiter *rate.Limiter
var rateLimiterDrop bool
var rateLimiterDropReason = "global_rate_limiter_drop"
// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
type Pipeline struct {
logger log.Logger
stages []Stage
jobName *string
logger log.Logger
stages []Stage
jobName *string
dropCount *prometheus.CounterVec
}
// NewPipeline creates a new log entry pipeline from a configuration
@ -48,9 +55,10 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist
}
}
return &Pipeline{
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
dropCount: getDropCountMetric(registerer),
}, nil
}
@ -99,6 +107,16 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
go func() {
defer wg.Done()
for e := range pipelineOut {
if rateLimiter != nil {
if rateLimiterDrop {
if !rateLimiter.Allow() {
p.dropCount.WithLabelValues(rateLimiterDropReason).Inc()
continue
}
} else {
_ = rateLimiter.Wait(context.Background())
}
}
nextChan <- e.Entry
}
}()
@ -122,3 +140,8 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
func (p *Pipeline) Size() int {
return len(p.stages)
}
func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool) {
rateLimiter = rate.NewLimiter(rate.Limit(rateVal), burstVal)
rateLimiterDrop = drop
}

@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"github.com/grafana/loki/clients/pkg/promtail/limit"
yaml "gopkg.in/yaml.v2"
"github.com/grafana/loki/clients/pkg/promtail/client"
@ -24,6 +25,7 @@ type Config struct {
PositionsConfig positions.Config `yaml:"positions,omitempty"`
ScrapeConfig []scrapeconfig.Config `yaml:"scrape_configs,omitempty"`
TargetConfig file.Config `yaml:"target_config,omitempty"`
LimitConfig limit.Config `yaml:"limit_config,omitempty"`
}
// RegisterFlags with prefix registers flags where every name is prefixed by
@ -33,6 +35,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
c.ClientConfig.RegisterFlagsWithPrefix(prefix, f)
c.PositionsConfig.RegisterFlagsWithPrefix(prefix, f)
c.TargetConfig.RegisterFlagsWithPrefix(prefix, f)
c.LimitConfig.RegisterFlagsWithPrefix(prefix, f)
}
// RegisterFlags registers flags.

@ -33,6 +33,9 @@ scrape_configs:
- localhost
labels:
job: varlogs
limit_config:
readline_rate: 100
readline_burst: 200
`
func Test_Load(t *testing.T) {
@ -41,6 +44,15 @@ func Test_Load(t *testing.T) {
require.Nil(t, err)
}
func Test_RateLimitLoad(t *testing.T) {
var dst Config
err := yaml.Unmarshal([]byte(testFile), &dst)
require.Nil(t, err)
config := dst.LimitConfig
require.Equal(t, float64(100), config.ReadlineRate)
require.Equal(t, 200, config.ReadlineBurst)
}
func TestConfig_Setup(t *testing.T) {
for i, tt := range []struct {
in Config

@ -0,0 +1,19 @@
package limit
import (
"flag"
)
type Config struct {
ReadlineRate float64 `yaml:"readline_rate" json:"readline_rate"`
ReadlineBurst int `yaml:"readline_burst" json:"readline_burst"`
ReadlineRateEnabled bool `yaml:"readline_rate_enabled,omitempty" json:"readline_rate_enabled"`
ReadlineRateDrop bool `yaml:"readline_rate_drop,omitempty" json:"readline_rate_drop"`
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Float64Var(&cfg.ReadlineRate, prefix+"limit.readline-rate", 10000, "promtail readline Rate.")
f.IntVar(&cfg.ReadlineBurst, prefix+"limit.readline-burst", 10000, "promtail readline Burst.")
f.BoolVar(&cfg.ReadlineRateEnabled, prefix+"limit.readline-rate-enabled", false, "Set to false to disable readline rate limit.")
f.BoolVar(&cfg.ReadlineRateDrop, prefix+"limit.readline-rate-drop", true, "Set to true to drop log when rate limit.")
}

@ -5,6 +5,7 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/clients/pkg/promtail/client"
@ -57,6 +58,9 @@ func New(cfg config.Config, dryRun bool, opts ...Option) (*Promtail, error) {
cfg.Setup()
if cfg.LimitConfig.ReadlineRateEnabled {
stages.SetReadLineRateLimiter(cfg.LimitConfig.ReadlineRate, cfg.LimitConfig.ReadlineBurst, cfg.LimitConfig.ReadlineRateDrop)
}
var err error
if dryRun {
promtail.client, err = client.NewLogger(prometheus.DefaultRegisterer, promtail.logger, cfg.ClientConfigs...)

Loading…
Cancel
Save