chore: Call `shardstreams.Config` by value instead of by reference (#12915)

Use `shardstreams.Config` by value instead of by reference to fix docs generation.
Our `docs-generator` tool relies on the struct address/references to assume that flags are present. Using this config by value fixes it.
pull/12745/merge
Dylan Guedes 2 years ago committed by GitHub
parent 19fef9355f
commit afd9e36306
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      docs/sources/shared/configuration.md
  2. 6
      pkg/distributor/distributor.go
  3. 2
      pkg/distributor/limits.go
  4. 6
      pkg/distributor/ratestore_test.go
  5. 7
      pkg/distributor/shardstreams/config.go
  6. 2
      pkg/ingester/instance_test.go
  7. 2
      pkg/ingester/limiter.go
  8. 2
      pkg/loki/loki.go
  9. 5
      pkg/validation/limits.go

@ -3305,12 +3305,22 @@ ruler_remote_write_sigv4_config:
# Deprecated: Use deletion_mode per tenant configuration instead.
[allow_deletes: <boolean>]
# Define streams sharding behavior.
shard_streams:
[enabled: <boolean>]
# Automatically shard streams to keep them under the per-stream rate limit.
# Sharding is dictated by the desired rate.
# CLI flag: -shard-streams.enabled
[enabled: <boolean> | default = true]
[logging_enabled: <boolean>]
# Whether to log sharding streams behavior or not. Not recommended for
# production environments.
# CLI flag: -shard-streams.logging-enabled
[logging_enabled: <boolean> | default = false]
[desired_rate: <int>]
# Threshold used to cut a new shard. Default (1536KB) means if a rate is above
# 1536KB/s, it will be sharded into two streams.
# CLI flag: -shard-streams.desired-rate
[desired_rate: <int> | default = 1536KB]
[blocked_queries: <blocked_query...>]

@ -589,7 +589,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID
return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream)
}
func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg *shardstreams.Config, stream logproto.Stream) []KeyedStream {
func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream) []KeyedStream {
derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg)
for i := 0; i < len(stream.Entries); i++ {
@ -601,7 +601,7 @@ func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards in
return derivedStreams
}
func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg *shardstreams.Config) []KeyedStream {
func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config) []KeyedStream {
var (
streamLabels = labelTemplate(stream.Labels, d.logger)
streamPattern = streamLabels.String()
@ -809,7 +809,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
// based on the rate stored in the rate store and will store the new evaluated number of shards.
//
// desiredRate is expected to be given in bytes.
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, pushSize int, tenantID string, streamShardcfg *shardstreams.Config) int {
func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream, pushSize int, tenantID string, streamShardcfg shardstreams.Config) int {
if streamShardcfg.DesiredRate.Val() <= 0 {
if streamShardcfg.LoggingEnabled {
level.Error(logger).Log("msg", "invalid desired rate", "desired_rate", streamShardcfg.DesiredRate.String())

@ -25,7 +25,7 @@ type Limits interface {
DiscoverServiceName(userID string) []string
DiscoverLogLevels(userID string) bool
ShardStreams(userID string) *shardstreams.Config
ShardStreams(userID string) shardstreams.Config
IngestionRateStrategy() string
IngestionRateBytes(userID string) float64
IngestionBurstSizeBytes(userID string) int

@ -341,15 +341,15 @@ type fakeOverrides struct {
func (c *fakeOverrides) AllByUserID() map[string]*validation.Limits {
return map[string]*validation.Limits{
"ingester0": {
ShardStreams: &shardstreams.Config{
ShardStreams: shardstreams.Config{
Enabled: c.enabled,
},
},
}
}
func (c *fakeOverrides) ShardStreams(_ string) *shardstreams.Config {
return &shardstreams.Config{
func (c *fakeOverrides) ShardStreams(_ string) shardstreams.Config {
return shardstreams.Config{
Enabled: c.enabled,
}
}

@ -7,12 +7,13 @@ import (
)
type Config struct {
Enabled bool `yaml:"enabled" json:"enabled"`
LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled"`
Enabled bool `yaml:"enabled" json:"enabled" doc:"description=Automatically shard streams to keep them under the per-stream rate limit. Sharding is dictated by the desired rate."`
LoggingEnabled bool `yaml:"logging_enabled" json:"logging_enabled" doc:"description=Whether to log sharding streams behavior or not. Not recommended for production environments."`
// DesiredRate is the threshold used to shard the stream into smaller pieces.
// Expected to be in bytes.
DesiredRate flagext.ByteSize `yaml:"desired_rate" json:"desired_rate"`
DesiredRate flagext.ByteSize `yaml:"desired_rate" json:"desired_rate" doc:"description=Threshold used to cut a new shard. Default (1536KB) means if a rate is above 1536KB/s, it will be sharded into two streams."`
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {

@ -1049,7 +1049,7 @@ func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
func TestStreamShardingUsage(t *testing.T) {
setupCustomTenantLimit := func(perStreamLimit string) *validation.Limits {
shardStreamsCfg := &shardstreams.Config{Enabled: true, LoggingEnabled: true}
shardStreamsCfg := shardstreams.Config{Enabled: true, LoggingEnabled: true}
shardStreamsCfg.DesiredRate.Set("6MB") //nolint:errcheck
customTenantLimits := &validation.Limits{}

@ -27,7 +27,7 @@ type Limits interface {
MaxLocalStreamsPerUser(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
ShardStreams(userID string) *shardstreams.Config
ShardStreams(userID string) shardstreams.Config
}
// Limiter implements primitives to get the maximum number of streams

@ -97,7 +97,7 @@ type Config struct {
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
CompactorHTTPClient compactorclient.HTTPConfig `yaml:"compactor_client,omitempty" doc:"hidden"`
CompactorGRPCClient compactorclient.GRPCConfig `yaml:"compactor_grpc_client,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config"`
Worker worker.Config `yaml:"frontend_worker,omitempty"`
TableManager index.TableManagerConfig `yaml:"table_manager,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`

@ -187,7 +187,7 @@ type Limits struct {
// Deprecated
CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes" doc:"deprecated|description=Use deletion_mode per tenant configuration instead."`
ShardStreams *shardstreams.Config `yaml:"shard_streams" json:"shard_streams"`
ShardStreams shardstreams.Config `yaml:"shard_streams" json:"shard_streams" doc:"description=Define streams sharding behavior."`
BlockedQueries []*validation.BlockedQuery `yaml:"blocked_queries,omitempty" json:"blocked_queries,omitempty"`
@ -388,7 +388,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
),
)
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint")
@ -900,7 +899,7 @@ func (o *Overrides) DeletionMode(userID string) string {
return o.getOverridesForUser(userID).DeletionMode
}
func (o *Overrides) ShardStreams(userID string) *shardstreams.Config {
func (o *Overrides) ShardStreams(userID string) shardstreams.Config {
return o.getOverridesForUser(userID).ShardStreams
}

Loading…
Cancel
Save