Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/distributor/distributor.go

636 lines
22 KiB

package distributor
import (
"context"
"flag"
"math"
"net/http"
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
"strconv"
"strings"
"time"
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
lru "github.com/hashicorp/golang-lru"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/flagext"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
const (
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
// ShardLbName is the internal label to be used by Loki when dividing a stream into smaller pieces.
// Possible values are only increasing integers starting from 0.
ShardLbName = "__stream_shard__"
ShardLbPlaceholder = "__placeholder__"
ringKey = "distributor"
)
var (
maxLabelCacheSize = 100000
rfStats = usagestats.NewInt("distributor_replication_factor")
)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
type ShardStreamsConfig struct {
Enabled bool `yaml:"enabled"`
LoggingEnabled bool `yaml:"logging_enabled"`
// DesiredRate is the threshold used to shard the stream into smaller pieces.
// Expected to be in bytes.
DesiredRate flagext.ByteSize `yaml:"desired_rate"`
}
func (cfg *ShardStreamsConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Automatically shard streams to keep them under the per-stream rate limit")
fs.BoolVar(&cfg.LoggingEnabled, prefix+".logging-enabled", false, "Enable logging when sharding streams")
fs.Var(&cfg.DesiredRate, prefix+".desired-rate", "threshold used to cut a new shard. Default (3MB) means if a rate is above 3MB, it will be sharded.")
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
}
// Config for a Distributor.
type Config struct {
// Distributors ring
DistributorRing RingConfig `yaml:"ring,omitempty"`
// For testing.
factory ring_client.PoolFactory `yaml:"-"`
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
// ShardStreams configures wether big streams should be sharded or not.
ShardStreams ShardStreamsConfig `yaml:"shard_streams"`
}
// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.ShardStreams.RegisterFlagsWithPrefix("distributor.shard-streams", fs)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
}
// RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStore interface {
RateFor(stream *logproto.Stream) (int, error)
}
// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
services.Service
cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
tenantsRetention *retention.TenantsRetention
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
rateStore RateStore
// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
distributorsLifecycler *ring.Lifecycler
rateLimitStrat string
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
labelCache *lru.Cache
// metrics
ingesterAppends *prometheus.CounterVec
ingesterAppendFailures *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardingFailures *prometheus.CounterVec
}
// New a distributor creates.
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
func New(
cfg Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
overrides *validation.Overrides,
registerer prometheus.Registerer,
) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}
}
validator, err := NewValidator(overrides)
if err != nil {
return nil, err
}
// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsLifecycler *ring.Lifecycler
rateLimitStrat := validation.LocalIngestionRateStrategy
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
var servs []services.Service
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
rateLimitStrat = validation.GlobalIngestionRateStrategy
if err != nil {
return nil, errors.Wrap(err, "create distributor KV store client")
}
distributorsLifecycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, false, util_log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
if err != nil {
return nil, errors.Wrap(err, "create distributor lifecycler")
}
servs = append(servs, distributorsLifecycler)
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsLifecycler)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
}
labelCache, err := lru.New(maxLabelCacheSize)
if err != nil {
return nil, err
}
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
tenantConfigs: configs,
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
distributorsLifecycler: distributorsLifecycler,
validator: validator,
pool: clientpool.NewPool(clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
labelCache: labelCache,
rateLimitStrat: rateLimitStrat,
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_appends_total",
Help: "The total number of batch appends sent to ingesters.",
}, []string{"ingester"}),
ingesterAppendFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_append_failures_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"}),
replicationFactor: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "distributor_replication_factor",
Help: "The configured replication factor.",
}),
streamShardingFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "stream_sharding_failures",
Help: "Total number of failures when sharding a stream",
}, []string{
"reason",
}),
}
d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
rfStats.Set(int64(ingestersRing.ReplicationFactor()))
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
servs = append(servs, d.pool)
d.subservices, err = services.NewManager(servs...)
if err != nil {
return nil, errors.Wrap(err, "services manager")
}
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
d.rateStore = &noopRateStore{}
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
return &d, nil
}
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
func (d *Distributor) starting(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
}
func (d *Distributor) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
}
func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
stream logproto.Stream
minSuccess int
maxFailures int
succeeded atomic.Int32
failed atomic.Int32
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
type pushTracker struct {
streamsPending atomic.Int32
streamsFailed atomic.Int32
done chan struct{}
err chan error
}
// Push a set of streams.
// The returned error is the last one seen.
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
// Return early if request does not contain any streams
if len(req.Streams) == 0 {
return &logproto.PushResponse{}, nil
}
// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
validatedLineSize := 0
validatedLineCount := 0
var validationErr error
validationContext := d.validator.getValidationContextForTime(time.Now(), userID)
for _, stream := range req.Streams {
// Return early if stream does not contain any entries
if len(stream.Entries) == 0 {
continue
}
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)
stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
validationErr = err
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, userID).Add(float64(len(stream.Entries)))
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, userID).Add(float64(bytes))
continue
}
n := 0
streamSize := 0
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
validationErr = err
continue
}
stream.Entries[n] = entry
// If configured for this tenant, increment duplicate timestamps. Note, this is imperfect
// since Loki will accept out of order writes it doesn't account for separate
// pushes with overlapping time ranges having entries with duplicate timestamps
if validationContext.incrementDuplicateTimestamps && n != 0 {
// Traditional logic for Loki is that 2 lines with the same timestamp and
// exact same content will be de-duplicated, (i.e. only one will be stored, others dropped)
// To maintain this behavior, only increment the timestamp if the log content is different
if stream.Entries[n-1].Line != entry.Line {
stream.Entries[n].Timestamp = maxT(entry.Timestamp, stream.Entries[n-1].Timestamp.Add(1*time.Nanosecond))
}
}
n++
validatedLineSize += len(entry.Line)
validatedLineCount++
streamSize += len(entry.Line)
}
stream.Entries = stream.Entries[:n]
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
if d.cfg.ShardStreams.Enabled {
derivedKeys, derivedStreams := d.shardStream(stream, streamSize, userID)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
keys = append(keys, derivedKeys...)
streams = append(streams, derivedStreams...)
} else {
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{stream: stream})
}
}
// Return early if none of the streams contained entries
if len(streams) == 0 {
return &logproto.PushResponse{}, validationErr
}
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedLineSize) {
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedLineSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, userID, int(d.ingestionRateLimiter.Limit(now, userID)), validatedLineCount, validatedLineSize)
}
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc
streamsByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
for i, key := range keys {
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil)
if err != nil {
return nil, err
}
streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
streams[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Instances {
streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streams[i])
ingesterDescs[ingester.Addr] = ingester
}
}
tracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
tracker.streamsPending.Store(int32(len(streams)))
for ingester, streams := range streamsByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendStreams(localCtx, ingester, samples, &tracker)
}(ingesterDescs[ingester], streams)
}
select {
case err := <-tracker.err:
return nil, err
case <-tracker.done:
return &logproto.PushResponse{}, validationErr
case <-ctx.Done():
return nil, ctx.Err()
}
}
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
func min(x1, x2 int) int {
if x1 < x2 {
return x1
}
return x2
}
// shardStream shards (divides) the given stream into N smaller streams, where
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID string) ([]uint32, []streamTracker) {
shardCount := d.shardCountFor(&stream, streamSize, d.cfg.ShardStreams.DesiredRate.Val(), d.rateStore)
if shardCount <= 1 {
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
return []uint32{util.TokenFor(userID, stream.Labels)}, []streamTracker{{stream: stream}}
}
if d.cfg.ShardStreams.LoggingEnabled {
level.Info(util_log.Logger).Log("msg", "sharding request", "stream", stream.Labels, "shard_count", shardCount)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
}
streamLabels := labelTemplate(stream.Labels)
streamPattern := streamLabels.String()
derivedKeys := make([]uint32, 0, shardCount)
derivedStreams := make([]streamTracker, 0, shardCount)
for i := 0; i < shardCount; i++ {
shard, ok := d.createShard(stream, streamLabels, streamPattern, shardCount, i)
if !ok {
level.Error(util_log.Logger).Log("msg", "couldn't create shard", "stream", stream.Labels, "idx", i)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
continue
}
derivedKeys = append(derivedKeys, util.TokenFor(userID, shard.Labels))
derivedStreams = append(derivedStreams, streamTracker{stream: shard})
if d.cfg.ShardStreams.LoggingEnabled {
level.Info(util_log.Logger).Log("msg", "stream derived from sharding", "src-stream", stream.Labels, "derived-stream", shard.Labels)
}
}
return derivedKeys, derivedStreams
}
// labelTemplate returns a label set that includes the dummy label to be replaced
// To avoid allocations, this slice is reused when we know the stream value
func labelTemplate(lbls string) labels.Labels {
baseLbls, err := syntax.ParseLabels(lbls)
if err != nil {
level.Error(util_log.Logger).Log("msg", "couldn't extract labels from stream", "stream", lbls)
return nil
}
streamLabels := make([]labels.Label, len(baseLbls)+1)
for i := 0; i < len(baseLbls); i++ {
streamLabels[i] = baseLbls[i]
}
streamLabels[len(baseLbls)] = labels.Label{Name: ShardLbName, Value: ShardLbPlaceholder}
return streamLabels
}
func (d *Distributor) createShard(stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) {
lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber)
if !ok {
return logproto.Stream{}, false
}
shardLabel := strconv.Itoa(shardNumber)
lbls[len(lbls)-1] = labels.Label{Name: ShardLbName, Value: shardLabel}
return logproto.Stream{
Labels: strings.Replace(streamPattern, ShardLbPlaceholder, shardLabel, 1),
Hash: lbls.Hash(),
Entries: stream.Entries[lowerBound:upperBound],
}, true
}
func (d *Distributor) boundsFor(stream logproto.Stream, totalShards, shardNumber int) (int, int, bool) {
entriesPerWindow := float64(len(stream.Entries)) / float64(totalShards)
fIdx := float64(shardNumber)
lowerBound := int(fIdx * entriesPerWindow)
upperBound := min(int(entriesPerWindow*(1+fIdx)), len(stream.Entries))
if lowerBound > upperBound {
if d.cfg.ShardStreams.LoggingEnabled {
level.Warn(util_log.Logger).Log("msg", "sharding with lowerbound > upperbound", "lowerbound", lowerBound, "upperbound", upperBound, "shards", totalShards, "labels", stream.Labels)
}
return 0, 0, false
}
return lowerBound, upperBound, true
}
// maxT returns the highest between two given timestamps.
func maxT(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t2
}
return t1
}
func (d *Distributor) truncateLines(vContext validationContext, stream *logproto.Stream) {
if !vContext.maxLineSizeTruncate {
return
}
var truncatedSamples, truncatedBytes int
for i, e := range stream.Entries {
if maxSize := vContext.maxLineSize; maxSize != 0 && len(e.Line) > maxSize {
stream.Entries[i].Line = e.Line[:maxSize]
truncatedSamples++
truncatedBytes = len(e.Line) - maxSize
}
}
validation.MutatedSamples.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedSamples))
validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes))
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendStreamsErr(ctx, ingester, streamTrackers)
// If we succeed, decrement each stream's pending count by one.
// If we reach the required number of successful puts on this stream, then
// decrement the number of pending streams by one.
// If we successfully push all streams to min success ingesters, wake up the
// waiting rpc so it can return early. Similarly, track the number of errors,
// and if it exceeds maxFailures shortcut the waiting rpc.
//
// The use of atomic increments here guarantees only a single sendStreams
// goroutine will write to either channel.
for i := range streamTrackers {
if err != nil {
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
continue
}
if pushTracker.streamsFailed.Inc() == 1 {
pushTracker.err <- err
}
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if pushTracker.streamsPending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
}
}
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error {
c, err := d.pool.GetClientFor(ingester.Addr)
if err != nil {
return err
}
req := &logproto.PushRequest{
Streams: make([]logproto.Stream, len(streams)),
}
for i, s := range streams {
req.Streams[i] = s.stream
}
_, err = c.(logproto.PusherClient).Push(ctx, req)
d.ingesterAppends.WithLabelValues(ingester.Addr).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
}
return err
}
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, error) {
labelVal, ok := d.labelCache.Get(key)
if ok {
return labelVal.(string), nil
}
ls, err := syntax.ParseLabels(key)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
}
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", err
}
lsVal := ls.String()
d.labelCache.Add(key, lsVal)
return lsVal, nil
}
// shardCountFor returns the right number of shards to be used by the given stream.
//
// It first checks if the number of shards is present in the shard store. If it isn't it will calculate it
// based on the rate stored in the rate store and will store the new evaluated number of shards.
//
// desiredRate is expected to be given in bytes.
func (d *Distributor) shardCountFor(stream *logproto.Stream, streamSize, desiredRate int, rateStore RateStore) int {
rate, err := rateStore.RateFor(stream)
if err != nil {
d.streamShardingFailures.WithLabelValues("rate_not_found").Inc()
if d.cfg.ShardStreams.LoggingEnabled {
level.Error(util_log.Logger).Log("msg", "couldn't shard stream because rate wasn't found", "stream", stream.Labels)
}
return 1
}
shards := calculateShards(rate, streamSize, desiredRate)
if shards > len(stream.Entries) {
d.streamShardingFailures.WithLabelValues("too_many_shards").Inc()
if d.cfg.ShardStreams.LoggingEnabled {
level.Error(util_log.Logger).Log("msg", "number of shards bigger than number of entries", "stream", stream.Labels, "shards", shards, "entries", len(stream.Entries))
}
return len(stream.Entries)
}
if shards == 0 {
// 1 shard is enough for the given stream.
return 1
}
return shards
}
func calculateShards(rate, streamSize, desiredRate int) int {
shards := float64((rate + streamSize)) / float64(desiredRate)
if shards <= 1 {
return 1
}
return int(math.Ceil(shards))
}