feat(dataobj): Ensure constant sharding for the dataobj querier (#16273)

pull/16274/head^2
Cyril Tovena 3 months ago committed by GitHub
parent 49a69b2e64
commit 78a141c6a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      docs/sources/shared/configuration.md
  2. 15
      pkg/dataobj/querier/store.go
  3. 13
      pkg/loki/modules.go
  4. 2
      pkg/querier/queryrange/limits.go
  5. 2
      pkg/querier/queryrange/roundtrip.go
  6. 7
      pkg/storage/config/schema_config.go

@ -821,6 +821,10 @@ dataobj:
# CLI flag: -dataobj-querier-from
[from: <daytime> | default = 1970-01-01]
# The number of shards to use for the dataobj querier.
# CLI flag: -dataobj-querier-shard-factor
[shard_factor: <int> | default = 32]
# The prefix to use for the storage bucket.
# CLI flag: -dataobj-storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]

@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
storageconfig "github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
@ -60,13 +61,15 @@ var (
)
type Config struct {
Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."`
From storageconfig.DayTime `yaml:"from" doc:"description=The date of the first day of when the dataobj querier should start querying from. In YYYY-MM-DD format, for example: 2018-04-15."`
Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."`
From storageconfig.DayTime `yaml:"from" doc:"description=The date of the first day of when the dataobj querier should start querying from. In YYYY-MM-DD format, for example: 2018-04-15."`
ShardFactor int `yaml:"shard_factor" doc:"description=The number of shards to use for the dataobj querier."`
}
func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.Enabled, "dataobj-querier-enabled", false, "Enable the dataobj querier.")
f.Var(&c.From, "dataobj-querier-from", "The start time to query from.")
f.IntVar(&c.ShardFactor, "dataobj-querier-shard-factor", 32, "The number of shards to use for the dataobj querier.")
}
func (c *Config) Validate() error {
@ -76,6 +79,14 @@ func (c *Config) Validate() error {
return nil
}
func (c *Config) PeriodConfig() config.PeriodConfig {
return config.PeriodConfig{
From: c.From,
RowShards: uint32(c.ShardFactor),
Schema: "v13",
}
}
// Store implements querier.Store for querying data objects.
type Store struct {
bucket objstore.Bucket

@ -11,6 +11,7 @@ import (
"net/http/httputil"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
@ -1056,13 +1057,23 @@ func (i ingesterQueryOptions) QueryIngestersWithin() time.Duration {
func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")
schemas := t.Cfg.SchemaConfig
// Adjust schema config to use constant sharding for the timerange of dataobj querier.
if t.Cfg.DataObj.Querier.Enabled {
schemas = schemas.Clone()
schemas.Configs = append(schemas.Configs, t.Cfg.DataObj.Querier.PeriodConfig())
sort.Slice(schemas.Configs, func(i, j int) bool {
return schemas.Configs[i].From.UnixNano() < schemas.Configs[j].From.UnixNano()
})
}
middleware, stopper, err := queryrange.NewMiddleware(
t.Cfg.QueryRange,
t.Cfg.Querier.Engine,
ingesterQueryOptions{t.Cfg.Querier},
util_log.Logger,
t.Overrides,
t.Cfg.SchemaConfig,
schemas,
t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled,
prometheus.DefaultRegisterer,
t.Cfg.MetricsNamespace,

@ -649,7 +649,7 @@ func WeightedParallelism(
return 0
}
func minMaxModelTime(a, b model.Time) (min, max model.Time) {
func minMaxModelTime(a, b model.Time) (model.Time, model.Time) {
if a.Before(b) {
return a, b
}

@ -239,7 +239,6 @@ func NewMiddleware(
detectedFieldsTripperware, err := NewDetectedFieldsTripperware(
limits,
schema,
limitedTripperware,
logFilterTripperware,
)
@ -1222,7 +1221,6 @@ func sharedIndexTripperware(
// NewDetectedFieldsTripperware creates a new frontend tripperware responsible for handling detected field requests, which are basically log filter requests with a bit more processing.
func NewDetectedFieldsTripperware(
limits Limits,
_ config.SchemaConfig,
limitedTripperware base.Middleware,
logTripperware base.Middleware,
) (base.Middleware, error) {

@ -278,6 +278,13 @@ type SchemaConfig struct {
fileName string
}
func (cfg *SchemaConfig) Clone() SchemaConfig {
clone := *cfg
clone.Configs = make([]PeriodConfig, len(cfg.Configs))
copy(clone.Configs, cfg.Configs)
return clone
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.fileName, "schema-config-file", "", "The path to the schema config file. The schema config is used only when running Cortex with the chunks storage.")

Loading…
Cancel
Save