mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
349 lines
11 KiB
349 lines
11 KiB
package distributor
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"net/http"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
|
|
"github.com/cortexproject/cortex/pkg/ring"
|
|
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
|
|
cortex_util "github.com/cortexproject/cortex/pkg/util"
|
|
"github.com/cortexproject/cortex/pkg/util/limiter"
|
|
"github.com/cortexproject/cortex/pkg/util/services"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/weaveworks/common/httpgrpc"
|
|
"github.com/weaveworks/common/user"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
|
|
"github.com/grafana/loki/pkg/ingester/client"
|
|
"github.com/grafana/loki/pkg/logproto"
|
|
"github.com/grafana/loki/pkg/util"
|
|
"github.com/grafana/loki/pkg/util/validation"
|
|
)
|
|
|
|
var (
|
|
ingesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "loki",
|
|
Name: "distributor_ingester_appends_total",
|
|
Help: "The total number of batch appends sent to ingesters.",
|
|
}, []string{"ingester"})
|
|
ingesterAppendFailures = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "loki",
|
|
Name: "distributor_ingester_append_failures_total",
|
|
Help: "The total number of failed batch appends sent to ingesters.",
|
|
}, []string{"ingester"})
|
|
|
|
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "loki",
|
|
Name: "distributor_bytes_received_total",
|
|
Help: "The total number of uncompressed bytes received per tenant",
|
|
}, []string{"tenant"})
|
|
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Namespace: "loki",
|
|
Name: "distributor_lines_received_total",
|
|
Help: "The total number of lines received per tenant",
|
|
}, []string{"tenant"})
|
|
)
|
|
|
|
// Config for a Distributor.
|
|
type Config struct {
|
|
// Distributors ring
|
|
DistributorRing cortex_distributor.RingConfig `yaml:"ring,omitempty"`
|
|
|
|
// For testing.
|
|
factory ring_client.PoolFactory `yaml:"-"`
|
|
}
|
|
|
|
// RegisterFlags registers the flags.
|
|
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
|
|
cfg.DistributorRing.RegisterFlags(f)
|
|
}
|
|
|
|
// Distributor coordinates replicates and distribution of log streams.
|
|
type Distributor struct {
|
|
services.Service
|
|
|
|
cfg Config
|
|
clientCfg client.Config
|
|
ingestersRing ring.ReadRing
|
|
validator *Validator
|
|
pool *ring_client.Pool
|
|
|
|
// The global rate limiter requires a distributors ring to count
|
|
// the number of healthy instances.
|
|
distributorsRing *ring.Lifecycler
|
|
|
|
subservices *services.Manager
|
|
subservicesWatcher *services.FailureWatcher
|
|
|
|
// Per-user rate limiter.
|
|
ingestionRateLimiter *limiter.RateLimiter
|
|
}
|
|
|
|
// New a distributor creates.
|
|
func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
|
|
factory := cfg.factory
|
|
if factory == nil {
|
|
factory = func(addr string) (ring_client.PoolClient, error) {
|
|
return client.New(clientCfg, addr)
|
|
}
|
|
}
|
|
|
|
validator, err := NewValidator(overrides)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create the configured ingestion rate limit strategy (local or global).
|
|
var ingestionRateStrategy limiter.RateLimiterStrategy
|
|
var distributorsRing *ring.Lifecycler
|
|
|
|
var servs []services.Service
|
|
|
|
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
|
|
var err error
|
|
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
servs = append(servs, distributorsRing)
|
|
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsRing)
|
|
} else {
|
|
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
|
|
}
|
|
|
|
d := Distributor{
|
|
cfg: cfg,
|
|
clientCfg: clientCfg,
|
|
ingestersRing: ingestersRing,
|
|
distributorsRing: distributorsRing,
|
|
validator: validator,
|
|
pool: cortex_distributor.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
|
|
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
|
|
}
|
|
|
|
servs = append(servs, d.pool)
|
|
d.subservices, err = services.NewManager(servs...)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "services manager")
|
|
}
|
|
d.subservicesWatcher = services.NewFailureWatcher()
|
|
d.subservicesWatcher.WatchManager(d.subservices)
|
|
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
|
|
|
|
return &d, nil
|
|
}
|
|
|
|
func (d *Distributor) starting(ctx context.Context) error {
|
|
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
|
|
}
|
|
|
|
func (d *Distributor) running(ctx context.Context) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case err := <-d.subservicesWatcher.Chan():
|
|
return errors.Wrap(err, "distributor subservice failed")
|
|
}
|
|
}
|
|
|
|
func (d *Distributor) stopping(_ error) error {
|
|
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
|
|
}
|
|
|
|
// TODO taken from Cortex, see if we can refactor out an usable interface.
|
|
type streamTracker struct {
|
|
stream *logproto.Stream
|
|
minSuccess int
|
|
maxFailures int
|
|
succeeded int32
|
|
failed int32
|
|
}
|
|
|
|
// TODO taken from Cortex, see if we can refactor out an usable interface.
|
|
type pushTracker struct {
|
|
samplesPending int32
|
|
samplesFailed int32
|
|
done chan struct{}
|
|
err chan error
|
|
}
|
|
|
|
// Push a set of streams.
|
|
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
|
|
userID, err := user.ExtractOrgID(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Track metrics.
|
|
bytesCount := 0
|
|
lineCount := 0
|
|
for _, stream := range req.Streams {
|
|
for _, entry := range stream.Entries {
|
|
bytesCount += len(entry.Line)
|
|
lineCount++
|
|
}
|
|
}
|
|
bytesIngested.WithLabelValues(userID).Add(float64(bytesCount))
|
|
linesIngested.WithLabelValues(userID).Add(float64(lineCount))
|
|
|
|
// First we flatten out the request into a list of samples.
|
|
// We use the heuristic of 1 sample per TS to size the array.
|
|
// We also work out the hash value at the same time.
|
|
streams := make([]streamTracker, 0, len(req.Streams))
|
|
keys := make([]uint32, 0, len(req.Streams))
|
|
var validationErr error
|
|
validatedSamplesSize := 0
|
|
validatedSamplesCount := 0
|
|
|
|
for _, stream := range req.Streams {
|
|
if err := d.validator.ValidateLabels(userID, stream); err != nil {
|
|
validationErr = err
|
|
continue
|
|
}
|
|
|
|
entries := make([]logproto.Entry, 0, len(stream.Entries))
|
|
for _, entry := range stream.Entries {
|
|
if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil {
|
|
validationErr = err
|
|
continue
|
|
}
|
|
entries = append(entries, entry)
|
|
validatedSamplesSize += len(entry.Line)
|
|
validatedSamplesCount++
|
|
}
|
|
|
|
if len(entries) == 0 {
|
|
continue
|
|
}
|
|
stream.Entries = entries
|
|
keys = append(keys, util.TokenFor(userID, stream.Labels))
|
|
streams = append(streams, streamTracker{
|
|
stream: stream,
|
|
})
|
|
}
|
|
|
|
if len(streams) == 0 {
|
|
return &logproto.PushResponse{}, validationErr
|
|
}
|
|
|
|
now := time.Now()
|
|
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {
|
|
// Return a 429 to indicate to the client they are being rate limited
|
|
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
|
|
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
|
|
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize))
|
|
}
|
|
|
|
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
|
|
var descs [maxExpectedReplicationSet]ring.IngesterDesc
|
|
|
|
samplesByIngester := map[string][]*streamTracker{}
|
|
ingesterDescs := map[string]ring.IngesterDesc{}
|
|
for i, key := range keys {
|
|
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
|
|
streams[i].maxFailures = replicationSet.MaxErrors
|
|
for _, ingester := range replicationSet.Ingesters {
|
|
samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i])
|
|
ingesterDescs[ingester.Addr] = ingester
|
|
}
|
|
}
|
|
|
|
tracker := pushTracker{
|
|
samplesPending: int32(len(streams)),
|
|
done: make(chan struct{}),
|
|
err: make(chan error),
|
|
}
|
|
for ingester, samples := range samplesByIngester {
|
|
go func(ingester ring.IngesterDesc, samples []*streamTracker) {
|
|
// Use a background context to make sure all ingesters get samples even if we return early
|
|
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
|
|
defer cancel()
|
|
localCtx = user.InjectOrgID(localCtx, userID)
|
|
if sp := opentracing.SpanFromContext(ctx); sp != nil {
|
|
localCtx = opentracing.ContextWithSpan(localCtx, sp)
|
|
}
|
|
d.sendSamples(localCtx, ingester, samples, &tracker)
|
|
}(ingesterDescs[ingester], samples)
|
|
}
|
|
select {
|
|
case err := <-tracker.err:
|
|
return nil, err
|
|
case <-tracker.done:
|
|
return &logproto.PushResponse{}, validationErr
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// TODO taken from Cortex, see if we can refactor out an usable interface.
|
|
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
|
|
err := d.sendSamplesErr(ctx, ingester, streamTrackers)
|
|
|
|
// If we succeed, decrement each sample's pending count by one. If we reach
|
|
// the required number of successful puts on this sample, then decrement the
|
|
// number of pending samples by one. If we successfully push all samples to
|
|
// min success ingesters, wake up the waiting rpc so it can return early.
|
|
// Similarly, track the number of errors, and if it exceeds maxFailures
|
|
// shortcut the waiting rpc.
|
|
//
|
|
// The use of atomic increments here guarantees only a single sendSamples
|
|
// goroutine will write to either channel.
|
|
for i := range streamTrackers {
|
|
if err != nil {
|
|
if atomic.AddInt32(&streamTrackers[i].failed, 1) <= int32(streamTrackers[i].maxFailures) {
|
|
continue
|
|
}
|
|
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
|
|
pushTracker.err <- err
|
|
}
|
|
} else {
|
|
if atomic.AddInt32(&streamTrackers[i].succeeded, 1) != int32(streamTrackers[i].minSuccess) {
|
|
continue
|
|
}
|
|
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
|
|
pushTracker.done <- struct{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO taken from Cortex, see if we can refactor out an usable interface.
|
|
func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.IngesterDesc, streams []*streamTracker) error {
|
|
c, err := d.pool.GetClientFor(ingester.Addr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &logproto.PushRequest{
|
|
Streams: make([]*logproto.Stream, len(streams)),
|
|
}
|
|
for i, s := range streams {
|
|
req.Streams[i] = s.stream
|
|
}
|
|
|
|
_, err = c.(logproto.PusherClient).Push(ctx, req)
|
|
ingesterAppends.WithLabelValues(ingester.Addr).Inc()
|
|
if err != nil {
|
|
ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Check implements the grpc healthcheck
|
|
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
|
|
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
|
|
}
|
|
|