Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/distributor/distributor.go

685 lines
23 KiB

package distributor
import (
"context"
"flag"
"math"
"math/rand"
"net/http"
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
"strconv"
"strings"
"time"
"github.com/grafana/loki/pkg/ingester"
"github.com/go-kit/log"
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
lru "github.com/hashicorp/golang-lru"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
const (
ringKey = "distributor"
)
var (
maxLabelCacheSize = 100000
rfStats = usagestats.NewInt("distributor_replication_factor")
)
// Config for a Distributor.
type Config struct {
// Distributors ring
DistributorRing RingConfig `yaml:"ring,omitempty"`
// For testing.
factory ring_client.PoolFactory `yaml:"-"`
RateStore RateStoreConfig `yaml:"rate_store"`
}
// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
}
// RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStore interface {
RateFor(streamHash uint64) int64
}
// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
services.Service
cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
tenantsRetention *retention.TenantsRetention
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
rateStore RateStore
// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
distributorsLifecycler *ring.Lifecycler
rateLimitStrat string
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
labelCache *lru.Cache
// metrics
ingesterAppends *prometheus.CounterVec
ingesterAppendFailures *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardingFailures *prometheus.CounterVec
streamShardCount prometheus.Counter
}
// New a distributor creates.
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
func New(
cfg Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
overrides *validation.Overrides,
registerer prometheus.Registerer,
) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}
}
internalFactory := func(addr string) (ring_client.PoolClient, error) {
internalCfg := clientCfg
internalCfg.Internal = true
return client.New(internalCfg, addr)
}
validator, err := NewValidator(overrides)
if err != nil {
return nil, err
}
// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsLifecycler *ring.Lifecycler
rateLimitStrat := validation.LocalIngestionRateStrategy
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
var servs []services.Service
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
rateLimitStrat = validation.GlobalIngestionRateStrategy
if err != nil {
return nil, errors.Wrap(err, "create distributor KV store client")
}
distributorsLifecycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, false, util_log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
if err != nil {
return nil, errors.Wrap(err, "create distributor lifecycler")
}
servs = append(servs, distributorsLifecycler)
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsLifecycler)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
}
labelCache, err := lru.New(maxLabelCacheSize)
if err != nil {
return nil, err
}
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
tenantConfigs: configs,
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
distributorsLifecycler: distributorsLifecycler,
validator: validator,
pool: clientpool.NewPool(clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
labelCache: labelCache,
rateLimitStrat: rateLimitStrat,
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_appends_total",
Help: "The total number of batch appends sent to ingesters.",
}, []string{"ingester"}),
ingesterAppendFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_append_failures_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"}),
replicationFactor: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "distributor_replication_factor",
Help: "The configured replication factor.",
}),
streamShardingFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "stream_sharding_failures",
Help: "Total number of failures when sharding a stream",
}, []string{
"reason",
}),
streamShardCount: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
}
d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
rfStats.Set(int64(ingestersRing.ReplicationFactor()))
rs := NewRateStore(
d.cfg.RateStore,
ingestersRing,
clientpool.NewPool(
clientCfg.PoolConfig,
ingestersRing,
internalFactory,
util_log.Logger,
),
overrides,
registerer,
)
d.rateStore = rs
servs = append(servs, d.pool, rs)
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
d.subservices, err = services.NewManager(servs...)
if err != nil {
return nil, errors.Wrap(err, "services manager")
}
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
return &d, nil
}
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
func (d *Distributor) starting(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
}
func (d *Distributor) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
Convert Loki modules to services (#1804) * Loki now uses module services to start and stop its work. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use services methods to initialize some components. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use Cortex' NewModuleService. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted server to a service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted distributor to service. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use table manager service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * querier service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * query-frontend service Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Merged stopping method into shutdown Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Converted ingester to a service. It now starts all background tasks in Starting state. Stopping needs little work, as does reacting on lifecycler errors. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Loki Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * If lifecycler fails, ingester fails too. It now doesn't call os.Exit, but shuts down gracefully and enters Failed state. That triggers Loki to shutdown completely. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Ignore ErrStopProcess errors from services This is a signal that Loki should stop. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use single /ready handler It checks the state of all services, and asks ingester for its own check as well. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed unused value. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Lint Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix test. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Go mod tidy, vendor Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Tailers, not trailers. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Adds return for the healtcheck in case of error. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
5 years ago
}
func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
stream logproto.Stream
minSuccess int
maxFailures int
succeeded atomic.Int32
failed atomic.Int32
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
type pushTracker struct {
streamsPending atomic.Int32
streamsFailed atomic.Int32
done chan struct{}
err chan error
}
// Push a set of streams.
// The returned error is the last one seen.
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
// Return early if request does not contain any streams
if len(req.Streams) == 0 {
return &logproto.PushResponse{}, nil
}
// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
validatedLineSize := 0
validatedLineCount := 0
var validationErr error
validationContext := d.validator.getValidationContextForTime(time.Now(), userID)
for _, stream := range req.Streams {
// Return early if stream does not contain any entries
if len(stream.Entries) == 0 {
continue
}
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)
stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
validationErr = err
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, userID).Add(float64(len(stream.Entries)))
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, userID).Add(float64(bytes))
continue
}
n := 0
streamSize := 0
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
validationErr = err
continue
}
stream.Entries[n] = entry
// If configured for this tenant, increment duplicate timestamps. Note, this is imperfect
// since Loki will accept out of order writes it doesn't account for separate
// pushes with overlapping time ranges having entries with duplicate timestamps
if validationContext.incrementDuplicateTimestamps && n != 0 {
// Traditional logic for Loki is that 2 lines with the same timestamp and
// exact same content will be de-duplicated, (i.e. only one will be stored, others dropped)
// To maintain this behavior, only increment the timestamp if the log content is different
if stream.Entries[n-1].Line != entry.Line {
stream.Entries[n].Timestamp = maxT(entry.Timestamp, stream.Entries[n-1].Timestamp.Add(1*time.Nanosecond))
}
}
n++
validatedLineSize += len(entry.Line)
validatedLineCount++
streamSize += len(entry.Line)
}
stream.Entries = stream.Entries[:n]
shardStreamsCfg := d.validator.Limits.ShardStreams(userID)
if shardStreamsCfg.Enabled {
derivedKeys, derivedStreams := d.shardStream(stream, streamSize, userID)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
keys = append(keys, derivedKeys...)
streams = append(streams, derivedStreams...)
} else {
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{stream: stream})
}
}
// Return early if none of the streams contained entries
if len(streams) == 0 {
return &logproto.PushResponse{}, validationErr
}
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedLineSize) {
// Return a 429 to indicate to the client they are being rate limited
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedLineSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, userID, int(d.ingestionRateLimiter.Limit(now, userID)), validatedLineCount, validatedLineSize)
}
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.InstanceDesc
streamsByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.InstanceDesc{}
for i, key := range keys {
Hardcode ring.WriteNoExtend for distributor push operations (#7517) With zone awareness enabled we allow restarting multiple ingesters within a single zone at the same time, this means that for many more write requests there's potentially only 2 of the 3 replicas for a stream up at the time the write request is received. This lead to a large increase in 500s during rollouts. My suspicion is that the 500s on rollouts was always related to the following; the replica set ring code has something referred to as "extending the replica set". This essentially is the adding of ingesters in the ring that are not in an ACTIVE state (for example, are in LEAVING) to the set of replicas considered valid for an operation. Adding of the instance in a state other than ACTIVE can be controlled by using a different operation type, which Mimir was using but we were not. Here are the relevant functions, [ring.Get](https://github.com/grafana/dskit/blob/8d6d914ef639c45eda4ab6f76448b115dfbc504c/ring/ring.go#L335-L403) (particularly these lines [1](https://github.com/grafana/dskit/blob/8d6d914ef639c45eda4ab6f76448b115dfbc504c/ring/ring.go#L381-L389) and [2](https://github.com/grafana/dskit/blob/8d6d914ef639c45eda4ab6f76448b115dfbc504c/ring/ring.go#L394-L397)) and [replicationStrategy.Filter](https://github.com/grafana/dskit/blob/789ec0ca4a3b372335fad1c73147366a23ddc8b6/ring/replication_strategy.go#L22-L71) (particularly these lines [1](https://github.com/grafana/dskit/blob/789ec0ca4a3b372335fad1c73147366a23ddc8b6/ring/replication_strategy.go#L32-L36) and [2](https://github.com/grafana/dskit/blob/789ec0ca4a3b372335fad1c73147366a23ddc8b6/ring/replication_strategy.go#L54-L67)) The tl; dr of the above code snippets is that we seem to always have required a minSuccess from replicationFactor/2 + 1 except we overwrite replicationFactor to be the # of instances which is likely always going to be more than just 3 ingester replicas because of the replica set extension. At least for zone aware rollouts this seems to be the case. This is because many ingesters will be in the LEAVING state. We can avoid this by just using `WriteNoExtend` operations instead of `Write` operations ([here](https://github.com/grafana/dskit/blob/789ec0ca4a3b372335fad1c73147366a23ddc8b6/ring/ring.go#L83-L93)) In this PR I've hardcoded our call to the ring in `distributor.Push` to always use `ring.WriteNoExtend`. IMO this is the better option to allowing `ring.WriteNoExtend` to be used optionally and defaulting to `ring.Write`. Extending the replication set on writes feels like a footgun for write outages and there's even some feelings internally that allowing that extension is a bug. For more background information see: https://github.com/grafana/mimir/issues/1854 Signed-off-by: Callum Styan <callumstyan@gmail.com> Signed-off-by: Callum Styan <callumstyan@gmail.com>
3 years ago
replicationSet, err := d.ingestersRing.Get(key, ring.WriteNoExtend, descs[:0], nil, nil)
if err != nil {
return nil, err
}
streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors
streams[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Instances {
streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streams[i])
ingesterDescs[ingester.Addr] = ingester
}
}
tracker := pushTracker{
done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each
err: make(chan error, 1),
}
tracker.streamsPending.Store(int32(len(streams)))
for ingester, streams := range streamsByIngester {
go func(ingester ring.InstanceDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendStreams(localCtx, ingester, samples, &tracker)
}(ingesterDescs[ingester], streams)
}
select {
case err := <-tracker.err:
return nil, err
case <-tracker.done:
return &logproto.PushResponse{}, validationErr
case <-ctx.Done():
return nil, ctx.Err()
}
}
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
func min(x1, x2 int) int {
if x1 < x2 {
return x1
}
return x2
}
// shardStream shards (divides) the given stream into N smaller streams, where
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
//
// The number of shards is limited by the number of entries.
// If the right number of shards for the current load is bigger than the number of entries, entries are randomized between shards
// to avoid hotspots.
// Ex: If the right amount of shards for a stream is 8 but it only has 3 entries, the 3 entries are randomized between the 8 shards.
// This way we avoid the first 3 shards receiving all the load while the last 5 would receive no load.
func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID string) ([]uint32, []streamTracker) {
shardStreamsCfg := d.validator.Limits.ShardStreams(userID)
logger := log.With(util_log.WithUserID(userID, util_log.Logger), "stream", stream.Labels)
shardCount := d.shardCountFor(logger, &stream, streamSize, shardStreamsCfg)
if shardCount <= 1 {
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
return []uint32{util.TokenFor(userID, stream.Labels)}, []streamTracker{{stream: stream}}
}
d.streamShardCount.Inc()
if shardStreamsCfg.LoggingEnabled {
level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
}
if shardCount > len(stream.Entries) {
shardsOrder := d.randomizeShardsOrder(shardCount)
return d.divideEntriesBetweenShards(logger, userID, len(stream.Entries), shardStreamsCfg, stream, shardsOrder)
}
return d.divideEntriesBetweenShards(logger, userID, shardCount, shardStreamsCfg, stream, nil)
}
func (d *Distributor) randomizeShardsOrder(shardCount int) []int {
shardsOrder := make([]int, shardCount)
for i := 0; i < shardCount; i++ {
shardsOrder = append(shardsOrder, i)
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(shardsOrder), func(i, j int) { shardsOrder[i], shardsOrder[j] = shardsOrder[j], shardsOrder[i] })
return shardsOrder
}
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
func (d *Distributor) divideEntriesBetweenShards(logger log.Logger, userID string, shardCount int, shardStreamsCfg *shardstreams.Config, stream logproto.Stream, alternateShardsOrder []int) ([]uint32, []streamTracker) {
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
derivedKeys := make([]uint32, 0, shardCount)
derivedStreams := make([]streamTracker, 0, shardCount)
streamLabels := labelTemplate(stream.Labels)
streamPattern := streamLabels.String()
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
for i := 0; i < shardCount; i++ {
j := i
if len(alternateShardsOrder) > 0 {
j = alternateShardsOrder[i]
}
shard, ok := d.createShard(shardStreamsCfg, stream, streamLabels, streamPattern, shardCount, i, j)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
if !ok {
level.Error(logger).Log("msg", "couldn't create shard", "idx", i)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
continue
}
derivedKeys = append(derivedKeys, util.TokenFor(userID, shard.Labels))
derivedStreams = append(derivedStreams, streamTracker{stream: shard})
if shardStreamsCfg.LoggingEnabled {
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
level.Info(util_log.Logger).Log("msg", "stream derived from sharding", "src-stream", stream.Labels, "derived-stream", shard.Labels)
}
}
return derivedKeys, derivedStreams
}
// labelTemplate returns a label set that includes the dummy label to be replaced
// To avoid allocations, this slice is reused when we know the stream value
func labelTemplate(lbls string) labels.Labels {
baseLbls, err := syntax.ParseLabels(lbls)
if err != nil {
level.Error(util_log.Logger).Log("msg", "couldn't extract labels from stream", "stream", lbls)
return nil
}
streamLabels := make([]labels.Label, len(baseLbls)+1)
copy(streamLabels, baseLbls)
streamLabels[len(baseLbls)] = labels.Label{Name: ingester.ShardLbName, Value: ingester.ShardLbPlaceholder}
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
return streamLabels
}
func (d *Distributor) createShard(streamshardCfg *shardstreams.Config, stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, spot, shardNumber int) (logproto.Stream, bool) {
lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, spot, streamshardCfg.LoggingEnabled)
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
if !ok {
return logproto.Stream{}, false
}
shardLabel := strconv.Itoa(shardNumber)
lbls[len(lbls)-1] = labels.Label{Name: ingester.ShardLbName, Value: shardLabel}
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
return logproto.Stream{
Labels: strings.Replace(streamPattern, ingester.ShardLbPlaceholder, shardLabel, 1),
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
Hash: lbls.Hash(),
Entries: stream.Entries[lowerBound:upperBound],
}, true
}
func (d *Distributor) boundsFor(stream logproto.Stream, totalShards, idx int, loggingEnabled bool) (int, int, bool) {
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
entriesPerWindow := float64(len(stream.Entries)) / float64(totalShards)
fidx := float64(idx)
lowerBound := int(fidx * entriesPerWindow)
upperBound := min(int(entriesPerWindow*(1+fidx)), len(stream.Entries))
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
if lowerBound > upperBound {
if loggingEnabled {
Loki: Implement stream sharding (#6952) * introduce 'StreamSharder' interface to decouple sharding from managing shards * make the streamsharder take a whole stream instead of just and ID * Implement stream sharding usage. * move streamShard to distributor * Implement the stream sharder and shard iter (#6941) * Modify sharding to not duplicate all entries for all shards (#6942) * Modify sharding to not duplicate all entries for all shards. * Rename ShardIter and get rid of NextShardId method. * Get rid of ShardIter and ShardStats. * Implement a StreamSharder mock and use it in tests. * Rewrite stubbing and remove flags. * Add auto-splitting. * Only shard stream when approppriate. * Make sharding mode an enum. * Bench shardStream function. * Modify shard comparison to be <= 1. * Add a guard clause for when lowerbound>upperbound. * Apply suggestions from code review Avoid using `fmt.Sprintf`. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Update pkg/distributor/distributor.go Rename metric name. Co-authored-by: Danny Kopping <danny.kopping@grafana.com> * Avoid double-negative by Danny's suggestion. * Move TODO to its right place. * Log scenario where upperbound < lowerbound. * Reuse compiled regex. * Add docstring explaining why we use max(shards*2, 2). * make sharder use a RWMutex * simplify sharding config * replace doc string * Revert "Add auto-splitting" for standalone PR. This reverts commit e3886a62106505753d7a1260ec61049150f95438. * shard loop refactor * move shard creation into it's own function * fix after refactor * make config more explicit and document it * reduce allocations * review feedback Co-authored-by: Travis Patterson <travis.patterson@grafana.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
3 years ago
level.Warn(util_log.Logger).Log("msg", "sharding with lowerbound > upperbound", "lowerbound", lowerBound, "upperbound", upperBound, "shards", totalShards, "labels", stream.Labels)
}
return 0, 0, false
}
return lowerBound, upperBound, true
}
// maxT returns the highest between two given timestamps.
func maxT(t1, t2 time.Time) time.Time {
if t1.Before(t2) {
return t2
}
return t1
}
func (d *Distributor) truncateLines(vContext validationContext, stream *logproto.Stream) {
if !vContext.maxLineSizeTruncate {
return
}
var truncatedSamples, truncatedBytes int
for i, e := range stream.Entries {
if maxSize := vContext.maxLineSize; maxSize != 0 && len(e.Line) > maxSize {
stream.Entries[i].Line = e.Line[:maxSize]
truncatedSamples++
truncatedBytes = len(e.Line) - maxSize
}
}
validation.MutatedSamples.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedSamples))
validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes))
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendStreamsErr(ctx, ingester, streamTrackers)
// If we succeed, decrement each stream's pending count by one.
// If we reach the required number of successful puts on this stream, then
// decrement the number of pending streams by one.
// If we successfully push all streams to min success ingesters, wake up the
// waiting rpc so it can return early. Similarly, track the number of errors,
// and if it exceeds maxFailures shortcut the waiting rpc.
//
// The use of atomic increments here guarantees only a single sendStreams
// goroutine will write to either channel.
for i := range streamTrackers {
if err != nil {
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
continue
}
if pushTracker.streamsFailed.Inc() == 1 {
pushTracker.err <- err
}
} else {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if pushTracker.streamsPending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
}
}
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error {
c, err := d.pool.GetClientFor(ingester.Addr)
if err != nil {
return err
}
req := &logproto.PushRequest{
Streams: make([]logproto.Stream, len(streams)),
}
for i, s := range streams {
req.Streams[i] = s.stream
}
_, err = c.(logproto.PusherClient).Push(ctx, req)
d.ingesterAppends.WithLabelValues(ingester.Addr).Inc()
if err != nil {
d.ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
}
return err
}
type labelData struct {
labels string
hash uint64
}
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
labelVal := val.(labelData)
return labelVal.labels, labelVal.hash, nil
}
ls, err := syntax.ParseLabels(key)
if err != nil {
return "", 0, httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
}
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", 0, err
}
lsVal := ls.String()
lsHash := ls.Hash()
d.labelCache.Add(key, labelData{lsVal, lsHash})
return lsVal, lsHash, nil
}
// shardCountFor returns the right number of shards to be used by the given stream.
//
// It first checks if the number of shards is present in the shard store. If it isn't it will calculate it
// based on the rate stored in the rate store and will store the new evaluated number of shards.
//
// desiredRate is expected to be given in bytes.
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, streamSize int, streamShardcfg *shardstreams.Config) int {
if streamShardcfg.DesiredRate.Val() <= 0 {
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", streamShardcfg.DesiredRate.String())
}
return 1
}
rate := d.rateStore.RateFor(stream.Hash)
shards := calculateShards(rate, streamSize, streamShardcfg.DesiredRate.Val())
if shards > len(stream.Entries) {
d.streamShardingFailures.WithLabelValues("too_many_shards").Inc()
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "number of shards bigger than number of entries", "shards", shards, "entries", len(stream.Entries))
}
return shards
}
if shards == 0 {
// 1 shard is enough for the given stream.
return 1
}
return shards
}
func calculateShards(rate int64, streamSize, desiredRate int) int {
shards := float64(rate+int64(streamSize)) / float64(desiredRate)
if shards <= 1 {
return 1
}
return int(math.Ceil(shards))
}