Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/storage/config/schema_config.go

673 lines
23 KiB

package config
import (
"errors"
"flag"
"fmt"
"math"
"os"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/mtime"
"github.com/prometheus/common/model"
yaml "gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"github.com/grafana/loki/pkg/util/log"
)
const (
// Supported storage clients
StorageTypeAlibabaCloud = "alibabacloud"
StorageTypeAWS = "aws"
StorageTypeAWSDynamo = "aws-dynamo"
StorageTypeAzure = "azure"
StorageTypeBOS = "bos"
StorageTypeBoltDB = "boltdb"
StorageTypeCassandra = "cassandra"
StorageTypeInMemory = "inmemory"
StorageTypeBigTable = "bigtable"
StorageTypeBigTableHashed = "bigtable-hashed"
StorageTypeFileSystem = "filesystem"
StorageTypeGCP = "gcp"
StorageTypeGCPColumnKey = "gcp-columnkey"
StorageTypeGCS = "gcs"
StorageTypeGrpc = "grpc-store"
StorageTypeLocal = "local"
StorageTypeS3 = "s3"
StorageTypeSwift = "swift"
StorageTypeCOS = "cos"
// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
BoltDBShipperType = "boltdb-shipper"
TSDBType = "tsdb"
// ObjectStorageIndexRequiredPeriod defines the required index period for object storage based index stores like boltdb-shipper and tsdb
ObjectStorageIndexRequiredPeriod = 24 * time.Hour
)
var (
errInvalidSchemaVersion = errors.New("invalid schema version")
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
errInvalidTableName = errors.New("invalid table name")
errConfigFileNotSet = errors.New("schema config file needs to be set")
errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting")
errSchemaIncreasingFromTime = errors.New("from time in schemas must be distinct and in increasing order")
errCurrentBoltdbShipperNon24Hours = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period")
errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h")
errTSDBNon24HoursIndexPeriod = errors.New("tsdb must always have periodic config for index set to 24h")
errZeroLengthConfig = errors.New("must specify at least one schema configuration")
// regexp for finding the trailing index table number at the end of the table name
extractTableNumberRegex = regexp.MustCompile(`[0-9]+$`)
)
// ExtractTableNumberFromName extracts the table number from a given tableName.
// returns -1 on error.
func ExtractTableNumberFromName(tableName string) (int64, error) {
match := extractTableNumberRegex.Find([]byte(tableName))
if match == nil {
return -1, errInvalidTableName
}
tableNumber, err := strconv.ParseInt(string(match), 10, 64)
if err != nil {
return -1, err
}
return tableNumber, nil
}
// TableRange represents a range of table numbers built based on the configured schema start/end date and the table period.
// Both Start and End are inclusive.
type TableRange struct {
Start, End int64
PeriodConfig *PeriodConfig
}
// TableRanges represents a list of table ranges for multiple schemas.
type TableRanges []TableRange
// TableInRange tells whether given table falls in any of the ranges and the tableName has the right prefix based on the schema config.
func (t TableRanges) TableInRange(tableName string) (bool, error) {
tableNumber, err := ExtractTableNumberFromName(tableName)
if err != nil {
return false, err
}
cfg := t.ConfigForTableNumber(tableNumber)
return cfg != nil &&
fmt.Sprintf("%s%s", cfg.IndexTables.Prefix, strconv.Itoa(int(tableNumber))) == tableName, nil
}
func (t TableRanges) ConfigForTableNumber(tableNumber int64) *PeriodConfig {
for _, r := range t {
if cfg := r.ConfigForTableNumber(tableNumber); cfg != nil {
return cfg
}
}
return nil
}
func (t TableRanges) TableNameFor(table int64) (string, bool) {
cfg := t.ConfigForTableNumber(table)
if cfg == nil {
return "", false
}
return fmt.Sprintf("%s%d", cfg.IndexTables.Prefix, table), true
}
// TableInRange tells whether given table falls in the range and the tableName has the right prefix based on the schema config.
func (t TableRange) TableInRange(tableName string) (bool, error) {
// non-periodic tables
if t.PeriodConfig.IndexTables.Period == 0 {
return t.PeriodConfig.IndexTables.Prefix == tableName, nil
}
tableNumber, err := ExtractTableNumberFromName(tableName)
if err != nil {
return false, err
}
cfg := t.ConfigForTableNumber(tableNumber)
return cfg != nil &&
fmt.Sprintf("%s%s", cfg.IndexTables.Prefix, strconv.Itoa(int(tableNumber))) == tableName, nil
}
func (t TableRange) ConfigForTableNumber(tableNumber int64) *PeriodConfig {
if t.Start <= tableNumber && tableNumber <= t.End {
return t.PeriodConfig
}
return nil
}
// PeriodConfig defines the schema and tables to use for a period of time
type PeriodConfig struct {
// used when working with config
From DayTime `yaml:"from" doc:"description=The date of the first day that index buckets should be created. Use a date in the past if this is your only period_config, otherwise use a date when you want the schema to switch over. In YYYY-MM-DD format, for example: 2018-04-15."`
// type of index client to use.
IndexType string `yaml:"store" doc:"description=store and object_store below affect which <storage_config> key is used.\nWhich store to use for the index. Either aws, aws-dynamo, gcp, bigtable, bigtable-hashed, cassandra, boltdb or boltdb-shipper. "`
// type of object client to use; if omitted, defaults to store.
ObjectType string `yaml:"object_store" doc:"description=Which store to use for the chunks. Either aws, azure, gcp, bigtable, gcs, cassandra, swift, filesystem or a named_store (refer to named_stores_config). If omitted, defaults to the same value as store."`
Schema string `yaml:"schema" doc:"description=The schema version to use, current recommended schema is v11."`
IndexTables PeriodicTableConfig `yaml:"index" doc:"description=Configures how the index is updated and stored."`
ChunkTables PeriodicTableConfig `yaml:"chunks" doc:"description=Configured how the chunks are updated and stored."`
RowShards uint32 `yaml:"row_shards" doc:"description=How many shards will be created. Only used if schema is v10 or greater."`
// Integer representation of schema used for hot path calculation. Populated on unmarshaling.
schemaInt *int `yaml:"-"`
}
// UnmarshalYAML implements yaml.Unmarshaller.
func (cfg *PeriodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type plain PeriodConfig
err := unmarshal((*plain)(cfg))
if err != nil {
return err
}
// call VersionAsInt after unmarshaling to errcheck schema version and populate PeriodConfig.schemaInt
_, err = cfg.VersionAsInt()
return err
}
// GetIndexTableNumberRange returns the table number range calculated based on
// the configured schema start date, index table period and the given schemaEndDate
func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRange {
// non-periodic tables
if cfg.IndexTables.Period == 0 {
return TableRange{
PeriodConfig: cfg,
}
}
return TableRange{
Start: cfg.From.Unix() / int64(cfg.IndexTables.Period/time.Second),
End: schemaEndDate.Unix() / int64(cfg.IndexTables.Period/time.Second),
PeriodConfig: cfg,
}
}
// DayTime is a model.Time what holds day-aligned values, and marshals to/from
// YAML in YYYY-MM-DD format.
type DayTime struct {
model.Time
}
// MarshalYAML implements yaml.Marshaller.
func (d DayTime) MarshalYAML() (interface{}, error) {
return d.String(), nil
}
// UnmarshalYAML implements yaml.Unmarshaller.
func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error {
var from string
if err := unmarshal(&from); err != nil {
return err
}
t, err := time.Parse("2006-01-02", from)
if err != nil {
return err
}
d.Time = model.TimeFromUnix(t.Unix())
return nil
}
func (d *DayTime) String() string {
return d.Time.Time().UTC().Format("2006-01-02")
}
// SchemaConfig contains the config for our chunk index schemas
type SchemaConfig struct {
Configs []PeriodConfig `yaml:"configs"`
fileName string
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.fileName, "schema-config-file", "", "The path to the schema config file. The schema config is used only when running Cortex with the chunks storage.")
}
// loadFromFile loads the schema config from a yaml file
func (cfg *SchemaConfig) loadFromFile() error {
if cfg.fileName == "" {
return errConfigFileNotSet
}
f, err := os.Open(cfg.fileName)
if err != nil {
return err
}
decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
return decoder.Decode(&cfg)
}
// Validate the schema config and returns an error if the validation
// doesn't pass
func (cfg *SchemaConfig) Validate() error {
if len(cfg.Configs) == 0 {
return errZeroLengthConfig
}
activePCIndex := ActivePeriodConfig((*cfg).Configs)
// if current index type is boltdb-shipper and there are no upcoming index types then it should be set to 24 hours.
if cfg.Configs[activePCIndex].IndexType == BoltDBShipperType &&
cfg.Configs[activePCIndex].IndexTables.Period != ObjectStorageIndexRequiredPeriod && len(cfg.Configs)-1 == activePCIndex {
return errCurrentBoltdbShipperNon24Hours
}
// if upcoming index type is boltdb-shipper, it should always be set to 24 hours.
if len(cfg.Configs)-1 > activePCIndex && (cfg.Configs[activePCIndex+1].IndexType == BoltDBShipperType &&
cfg.Configs[activePCIndex+1].IndexTables.Period != ObjectStorageIndexRequiredPeriod) {
return errUpcomingBoltdbShipperNon24Hours
}
for i := range cfg.Configs {
periodCfg := &cfg.Configs[i]
periodCfg.applyDefaults()
if err := periodCfg.validate(); err != nil {
return err
}
if i+1 < len(cfg.Configs) {
if cfg.Configs[i].From.Time.Unix() >= cfg.Configs[i+1].From.Time.Unix() {
return errSchemaIncreasingFromTime
}
}
}
return nil
}
// ActivePeriodConfig returns index of active PeriodicConfig which would be applicable to logs that would be pushed starting now.
// Note: Another PeriodicConfig might be applicable for future logs which can change index type.
func ActivePeriodConfig(configs []PeriodConfig) int {
now := model.Now()
i := sort.Search(len(configs), func(i int) bool {
return configs[i].From.Time > now
})
if i > 0 {
i--
}
return i
}
func usingForPeriodConfigs(configs []PeriodConfig, fn func(string) bool) bool {
activePCIndex := ActivePeriodConfig(configs)
if fn(configs[activePCIndex].IndexType) ||
(len(configs)-1 > activePCIndex && fn(configs[activePCIndex+1].IndexType)) {
return true
}
return false
}
// IsObjectStorageIndex returns true if the index type is either boltdb-shipper or tsdb.
func IsObjectStorageIndex(indexType string) bool {
return indexType == BoltDBShipperType || indexType == TSDBType
}
// UsingObjectStorageIndex returns true if the current or any of the upcoming periods
// use an object store index.
func UsingObjectStorageIndex(configs []PeriodConfig) bool {
return usingForPeriodConfigs(configs, IsObjectStorageIndex)
}
func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
return 0
default:
return 16
}
}
// ForEachAfter will call f() on every entry after t, splitting
// entries if necessary so there is an entry starting at t
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) {
for i := 0; i < len(cfg.Configs); i++ {
if t > cfg.Configs[i].From.Time &&
(i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) {
// Split the i'th entry by duplicating then overwriting the From time
cfg.Configs = append(cfg.Configs[:i+1], cfg.Configs[i:]...)
cfg.Configs[i+1].From = DayTime{t}
}
if cfg.Configs[i].From.Time >= t {
f(&cfg.Configs[i])
}
}
}
func validateChunks(cfg PeriodConfig) error {
objectStore := cfg.IndexType
if cfg.ObjectType != "" {
objectStore = cfg.ObjectType
}
switch objectStore {
case "cassandra", "aws-dynamo", "bigtable-hashed", "gcp", "gcp-columnkey", "bigtable", "grpc-store":
if cfg.ChunkTables.Prefix == "" {
return errConfigChunkPrefixNotSet
}
return nil
default:
return nil
}
}
func (cfg *PeriodConfig) applyDefaults() {
if cfg.RowShards == 0 {
cfg.RowShards = defaultRowShards(cfg.Schema)
}
}
// ChunkFormat returns chunk format including it's headBlockFormat corresponding to the `schema` version
// in the given `PeriodConfig`.
func (cfg *PeriodConfig) ChunkFormat() (byte, chunkenc.HeadBlockFmt, error) {
sver, err := cfg.VersionAsInt()
if err != nil {
return 0, 0, fmt.Errorf("failed to get chunk format: %w", err)
}
switch {
case sver <= 12:
return chunkenc.ChunkFormatV3, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV3), nil
default: // for v13 and above
return chunkenc.ChunkFormatV4, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), nil
}
}
// TSDBFormat returns index format corresponding to the `schema` version
// in the given `PeriodConfig`.
func (cfg *PeriodConfig) TSDBFormat() (int, error) {
sver, err := cfg.VersionAsInt()
if err != nil {
return 0, fmt.Errorf("failed to get index format: %w", err)
}
switch {
case sver <= 12:
return index.FormatV2, nil
default: // for v13 and above
return index.FormatV3, nil
}
}
// Validate the period config.
func (cfg PeriodConfig) validate() error {
validateError := validateChunks(cfg)
if validateError != nil {
return validateError
}
if cfg.IndexType == TSDBType && cfg.IndexTables.Period != ObjectStorageIndexRequiredPeriod {
return errTSDBNon24HoursIndexPeriod
}
// Ensure the tables period is a multiple of the bucket period
if cfg.IndexTables.Period > 0 && cfg.IndexTables.Period%(24*time.Hour) != 0 {
return errInvalidTablePeriod
}
if cfg.ChunkTables.Period > 0 && cfg.ChunkTables.Period%(24*time.Hour) != 0 {
return errInvalidTablePeriod
}
v, err := cfg.VersionAsInt()
if err != nil {
return err
}
switch v {
case 10, 11, 12, 13:
if cfg.RowShards == 0 {
return fmt.Errorf("must have row_shards > 0 (current: %d) for schema (%s)", cfg.RowShards, cfg.Schema)
}
case 9:
return nil
default:
return errInvalidSchemaVersion
}
return nil
}
// Load the yaml file, or build the config from legacy command-line flags
func (cfg *SchemaConfig) Load() error {
if len(cfg.Configs) > 0 {
return nil
}
// Load config from file.
if err := cfg.loadFromFile(); err != nil {
return err
}
return cfg.Validate()
}
func (cfg *PeriodConfig) VersionAsInt() (int, error) {
// Read memoized schema version. This is called during unmarshaling,
// but may be nil in the case of testware.
if cfg.schemaInt != nil {
return *cfg.schemaInt, nil
}
v := strings.Trim(cfg.Schema, "v")
n, err := strconv.Atoi(v)
if err != nil {
err = fmt.Errorf("invalid schema version: %w", err)
}
cfg.schemaInt = &n
return n, err
}
// PeriodicTableConfig is configuration for a set of time-sharded tables.
type PeriodicTableConfig struct {
Prefix string `yaml:"prefix" doc:"description=Table prefix for all period tables."`
Period time.Duration `yaml:"period" doc:"description=Table period."`
Tags Tags `yaml:"tags" doc:"description=A map to be added to all managed tables."`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (cfg *PeriodicTableConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
g := struct {
Prefix string `yaml:"prefix"`
Period model.Duration `yaml:"period"`
Tags Tags `yaml:"tags"`
}{}
if err := unmarshal(&g); err != nil {
return err
}
cfg.Prefix = g.Prefix
cfg.Period = time.Duration(g.Period)
cfg.Tags = g.Tags
return nil
}
// MarshalYAML implements the yaml.Marshaler interface.
func (cfg PeriodicTableConfig) MarshalYAML() (interface{}, error) {
g := &struct {
Prefix string `yaml:"prefix"`
Period model.Duration `yaml:"period"`
Tags Tags `yaml:"tags"`
}{
Prefix: cfg.Prefix,
Period: model.Duration(cfg.Period),
Tags: cfg.Tags,
}
return g, nil
}
// AutoScalingConfig for DynamoDB tables.
type AutoScalingConfig struct {
Enabled bool `yaml:"enabled"`
RoleARN string `yaml:"role_arn"`
MinCapacity int64 `yaml:"min_capacity"`
MaxCapacity int64 `yaml:"max_capacity"`
OutCooldown int64 `yaml:"out_cooldown"`
InCooldown int64 `yaml:"in_cooldown"`
TargetValue float64 `yaml:"target"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, argPrefix+".enabled", false, "Should we enable autoscale for the table.")
f.StringVar(&cfg.RoleARN, argPrefix+".role-arn", "", "AWS AutoScaling role ARN")
f.Int64Var(&cfg.MinCapacity, argPrefix+".min-capacity", 3000, "DynamoDB minimum provision capacity.")
f.Int64Var(&cfg.MaxCapacity, argPrefix+".max-capacity", 6000, "DynamoDB maximum provision capacity.")
f.Int64Var(&cfg.OutCooldown, argPrefix+".out-cooldown", 1800, "DynamoDB minimum seconds between each autoscale up.")
f.Int64Var(&cfg.InCooldown, argPrefix+".in-cooldown", 1800, "DynamoDB minimum seconds between each autoscale down.")
f.Float64Var(&cfg.TargetValue, argPrefix+".target-value", 80, "DynamoDB target ratio of consumed capacity to provisioned capacity.")
}
func (cfg *PeriodicTableConfig) PeriodicTables(from, through model.Time, pCfg ProvisionConfig, beginGrace, endGrace time.Duration, retention time.Duration) []TableDesc {
var (
periodSecs = int64(cfg.Period / time.Second)
beginGraceSecs = int64(beginGrace / time.Second)
endGraceSecs = int64(endGrace / time.Second)
firstTable = from.Unix() / periodSecs
lastTable = through.Unix() / periodSecs
tablesToKeep = int64(retention/time.Second) / periodSecs
now = mtime.Now().Unix()
nowWeek = now / periodSecs
result = []TableDesc{}
)
// If interval ends exactly on a period boundary, don’t include the upcoming period
if through.Unix()%periodSecs == 0 {
lastTable--
}
// Don't make tables further back than the configured retention
if retention > 0 && lastTable > tablesToKeep && lastTable-firstTable >= tablesToKeep {
firstTable = lastTable - tablesToKeep
}
for i := firstTable; i <= lastTable; i++ {
tableName := cfg.tableForPeriod(i)
table := TableDesc{}
// if now is within table [start - grace, end + grace), then we need some write throughput
if (i*periodSecs)-beginGraceSecs <= now && now < (i*periodSecs)+periodSecs+endGraceSecs {
table = pCfg.ActiveTableProvisionConfig.BuildTableDesc(tableName, cfg.Tags)
level.Debug(log.Logger).Log("msg", "Table is Active",
"tableName", table.Name,
"provisionedRead", table.ProvisionedRead,
"provisionedWrite", table.ProvisionedWrite,
"useOnDemandMode", table.UseOnDemandIOMode,
"useWriteAutoScale", table.WriteScale.Enabled,
"useReadAutoScale", table.ReadScale.Enabled)
} else {
// Autoscale last N tables
// this is measured against "now", since the lastWeek is the final week in the schema config range
// the N last tables in that range will always be set to the inactive scaling settings.
disableAutoscale := i < (nowWeek - pCfg.InactiveWriteScaleLastN)
table = pCfg.InactiveTableProvisionConfig.BuildTableDesc(tableName, cfg.Tags, disableAutoscale)
level.Debug(log.Logger).Log("msg", "Table is Inactive",
"tableName", table.Name,
"provisionedRead", table.ProvisionedRead,
"provisionedWrite", table.ProvisionedWrite,
"useOnDemandMode", table.UseOnDemandIOMode,
"useWriteAutoScale", table.WriteScale.Enabled,
"useReadAutoScale", table.ReadScale.Enabled)
}
result = append(result, table)
}
return result
}
// ChunkTableFor calculates the chunk table shard for a given point in time.
func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) {
for i := range cfg.Configs {
if t >= cfg.Configs[i].From.Time && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) {
return cfg.Configs[i].ChunkTables.TableFor(t), nil
}
}
return "", fmt.Errorf("no chunk table found for time %v", t)
}
// SchemaForTime returns the Schema PeriodConfig to use for a given point in time.
func (cfg SchemaConfig) SchemaForTime(t model.Time) (PeriodConfig, error) {
for i := range cfg.Configs {
// TODO: callum, confirm we can rely on the schema configs being sorted in this order.
if t >= cfg.Configs[i].From.Time && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) {
return cfg.Configs[i], nil
}
}
return PeriodConfig{}, fmt.Errorf("no schema config found for time %v", t)
}
// TableFor calculates the table shard for a given point in time.
func (cfg *PeriodicTableConfig) TableFor(t model.Time) string {
if cfg.Period == 0 { // non-periodic
return cfg.Prefix
}
periodSecs := int64(cfg.Period / time.Second)
return cfg.tableForPeriod(t.Unix() / periodSecs)
}
func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string {
return cfg.Prefix + strconv.Itoa(int(i))
}
// Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From
func (cfg SchemaConfig) ExternalKey(ref logproto.ChunkRef) string {
p, err := cfg.SchemaForTime(ref.From)
v, _ := p.VersionAsInt()
if err == nil && v >= 12 {
return newerExternalKey(ref)
}
return newExternalKey(ref)
}
// VersionForChunk will return the schema version associated with the `From` timestamp of a chunk.
// The schema and chunk must be valid+compatible as the errors are not checked.
func (cfg SchemaConfig) VersionForChunk(ref logproto.ChunkRef) int {
p, _ := cfg.SchemaForTime(ref.From)
v, _ := p.VersionAsInt()
return v
}
// post-checksum
func newExternalKey(ref logproto.ChunkRef) string {
// This is the inverse of chunk.parseNewExternalKey.
return fmt.Sprintf("%s/%x:%x:%x:%x", ref.UserID, ref.Fingerprint, int64(ref.From), int64(ref.Through), ref.Checksum)
}
// v12+
func newerExternalKey(ref logproto.ChunkRef) string {
return fmt.Sprintf("%s/%x/%x:%x:%x", ref.UserID, ref.Fingerprint, int64(ref.From), int64(ref.Through), ref.Checksum)
}
func GetIndexStoreTableRanges(indexType string, periodicConfigs []PeriodConfig) TableRanges {
var ranges TableRanges
for i := range periodicConfigs {
if periodicConfigs[i].IndexType != indexType {
continue
}
periodEndTime := DayTime{Time: math.MaxInt64}
if i < len(periodicConfigs)-1 {
periodEndTime = DayTime{Time: periodicConfigs[i+1].From.Time.Add(-time.Millisecond)}
}
ranges = append(ranges, periodicConfigs[i].GetIndexTableNumberRange(periodEndTime))
}
return ranges
}