index-shipper: add support for multiple stores (#7754)

Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>

**What this PR does / why we need it**:
Currently loki initializes a single instance of index-shipper to [handle
all the table
ranges](ff7b462973/pkg/storage/factory.go (L188))
(from across periods) for a given index type `boltdb-shipper, tsdb`.
Since index-shipper only has the object client handle to the store
defined by `shared_store_type`, it limits the index uploads to a single
store. Setting `shared_store_type` to a different store at a later point
in time would mean losing access to the indexes stored in the previously
configured store.

With this PR, we initialize a separate index-shipper & table manager for
each period if `shared_store_type` is not explicity configured. This
offers the flexibility to store index in multiple stores (across
providers).

**Note**:
- usage of `shared_store_type` in this commit text refers to one of
these config options depending on the index in use:
`-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`
- `shared_store_type` used to default to the `object_store` from the
latest `period_config` if not explicitly configured. This PR removes
these defaults in favor of supporting index uploads to multiple stores.

**Which issue(s) this PR fixes**:
Fixes #7276

**Special notes for your reviewer**:
All the instances of downloads table manager operate on the same
cacheDir. But it shouldn't be a problem as the tableRanges do not
overlap across periods.

**Checklist**
- [X] Reviewed the `CONTRIBUTING.md` guide
- [ ] Documentation added
- [X] Tests updated
- [x] `CHANGELOG.md` updated
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

---------

Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
Co-authored-by: J Stickler <julie.stickler@grafana.com>
pull/9137/head
Ashwanth 3 years ago committed by GitHub
parent c4261b19fd
commit 5cef03d0ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      docs/sources/upgrading/_index.md
  3. 2
      integration/client/client.go
  4. 65
      integration/cluster/cluster.go
  5. 61
      integration/cluster/schema.go
  6. 10
      integration/loki_micro_services_delete_test.go
  7. 138
      integration/loki_micro_services_test.go
  8. 2
      integration/util/merger.go
  9. 10
      pkg/loki/config_wrapper.go
  10. 16
      pkg/loki/config_wrapper_test.go
  11. 28
      pkg/loki/modules.go
  12. 16
      pkg/loki/modules_test.go
  13. 86
      pkg/storage/config/schema_config.go
  14. 209
      pkg/storage/config/schema_config_test.go
  15. 66
      pkg/storage/factory.go
  16. 3
      pkg/storage/factory_test.go
  17. 72
      pkg/storage/store.go
  18. 293
      pkg/storage/store_test.go
  19. 103
      pkg/storage/stores/indexshipper/downloads/table_manager.go
  20. 126
      pkg/storage/stores/indexshipper/downloads/table_manager_test.go
  21. 27
      pkg/storage/stores/indexshipper/shipper.go
  22. 14
      pkg/storage/stores/indexshipper/uploads/table_manager.go
  23. 3
      pkg/storage/stores/indexshipper/uploads/table_manager_test.go
  24. 41
      pkg/storage/stores/shipper/index/table_manager.go
  25. 61
      pkg/storage/stores/shipper/index/table_manager_test.go
  26. 89
      pkg/storage/stores/shipper/indexgateway/gateway.go
  27. 102
      pkg/storage/stores/shipper/indexgateway/gateway_test.go
  28. 16
      pkg/storage/stores/shipper/shipper_index_client.go
  29. 118
      pkg/storage/stores/tsdb/head_manager.go
  30. 190
      pkg/storage/stores/tsdb/head_manager_test.go
  31. 56
      pkg/storage/stores/tsdb/index_client_test.go
  32. 8
      pkg/storage/stores/tsdb/index_shipper_querier.go
  33. 77
      pkg/storage/stores/tsdb/manager.go
  34. 59
      pkg/storage/stores/tsdb/store.go

@ -86,6 +86,7 @@
* [8271](https://github.com/grafana/loki/pull/8271) **kavirajk**: logql: Support urlencode and urldecode template functions
* [8259](https://github.com/grafana/loki/pull/8259) **mar4uk**: Extract push.proto from the logproto package to the separate module.
* [7906](https://github.com/grafana/loki/pull/7906) **kavirajk**: Add API endpoint that formats LogQL expressions and support new `fmt` subcommand in `logcli` to format LogQL query.
* [7754](https://github.com/grafana/loki/pull/7754) **ashwanthgoli** index-shipper: add support for multiple stores.
* [6675](https://github.com/grafana/loki/pull/6675) **btaani**: Add logfmt expression parser for selective extraction of labels from logfmt formatted logs
* [8474](https://github.com/grafana/loki/pull/8474) **farodin91**: Add support for short-lived S3 session tokens
* [8774](https://github.com/grafana/loki/pull/8774) **slim-bean**: Add new logql template functions `bytes`, `duration`, `unixEpochMillis`, `unixEpochNanos`, `toDateInZone`, `b64Enc`, and `b64Dec`

@ -141,6 +141,10 @@ level=info ts=2022-12-20T15:27:54.858554127Z caller=metrics.go:147 component=fro
These statistics are also displayed when using `--stats` with LogCLI.
#### Index shipper multi-store support
In releases prior to 2.8.1, if you did not explicitly configure `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`, those values default to the `object_store` configured in the latest `period_config` of the corresponding index type.
In releases 2.8.1 and later, these defaults are removed in favor of uploading indexes to multiple stores. If you do not explicitly configure a `shared-store`, the boltdb and tsdb indexes will be shipped to the `object_store` configured for that period.
## 2.7.0
### Loki

@ -456,7 +456,7 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
func (c *Client) rangeQueryURL(query string) string {
v := url.Values{}
v.Set("query", query)
v.Set("start", formatTS(c.Now.Add(-2*time.Hour)))
v.Set("start", formatTS(c.Now.Add(-7*24*time.Hour)))
v.Set("end", formatTS(c.Now.Add(time.Second)))
u, err := url.Parse(c.baseURL)

@ -20,11 +20,13 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/integration/util"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util/cfg"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
@ -60,22 +62,19 @@ limits_config:
per_stream_rate_limit_burst: 50MB
ingestion_rate_mb: 50
ingestion_burst_size_mb: 50
reject_old_samples: false
storage_config:
named_stores:
filesystem:
store-1:
directory: {{.sharedDataPath}}/fs-store-1
boltdb_shipper:
shared_store: filesystem
active_index_directory: {{.dataPath}}/index
cache_location: {{.dataPath}}/boltdb-cache
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
tsdb_shipper:
active_index_directory: {{.dataPath}}/tsdb-index
cache_location: {{.dataPath}}/tsdb-cache
compactor:
working_directory: {{.dataPath}}/retention
@ -92,6 +91,9 @@ ingester:
querier:
multi_tenant_queries_enabled: true
query_scheduler:
max_outstanding_requests_per_tenant: 2048
ruler:
enable_api: true
ring:
@ -104,7 +106,6 @@ ruler:
local:
directory: {{.sharedDataPath}}/rules
rule_path: {{.sharedDataPath}}/prom-rule
`))
)
@ -139,12 +140,14 @@ func (w *wrappedRegisterer) MustRegister(collectors ...prometheus.Collector) {
type Cluster struct {
sharedPath string
overridesFile string
components []*Component
waitGroup sync.WaitGroup
initedAt model.Time
periodCfgs []string
overridesFile string
}
func New(logLevel level.Value) *Cluster {
func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
if logLevel != nil {
util_log.Logger = level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.Allow(logLevel))
}
@ -162,10 +165,17 @@ func New(logLevel level.Value) *Cluster {
panic(fmt.Errorf("error creating overrides file: %w", err))
}
return &Cluster{
cluster := &Cluster{
sharedPath: sharedPath,
initedAt: model.Now(),
overridesFile: overridesFile,
}
for _, opt := range opts {
opt(cluster)
}
return cluster
}
func (c *Cluster) Run() error {
@ -266,6 +276,11 @@ func (c *Component) ClusterSharedPath() string {
return c.cluster.sharedPath
}
// component should be restarted if it's already running for the new flags to take effect
func (c *Component) AddFlags(flags ...string) {
c.flags = append(c.flags, flags...)
}
func (c *Component) HTTPURL() string {
return fmt.Sprintf("http://localhost:%s", port(c.loki.Server.HTTPListenAddr().String()))
}
@ -320,6 +335,9 @@ func (c *Component) writeConfig() error {
func (c *Component) MergedConfig() ([]byte, error) {
var sb bytes.Buffer
periodStart := config.DayTime{Time: c.cluster.initedAt.Add(-24 * time.Hour)}
additionalPeriodStart := config.DayTime{Time: c.cluster.initedAt.Add(-7 * 24 * time.Hour)}
if err := configTemplate.Execute(&sb, map[string]interface{}{
"dataPath": c.dataPath,
"sharedDataPath": c.cluster.sharedPath,
@ -330,6 +348,23 @@ func (c *Component) MergedConfig() ([]byte, error) {
merger := util.NewYAMLMerger()
merger.AddFragment(sb.Bytes())
// default to using boltdb index
if len(c.cluster.periodCfgs) == 0 {
c.cluster.periodCfgs = []string{boltDBShipperSchemaConfigTemplate}
}
for _, periodCfg := range c.cluster.periodCfgs {
var buf bytes.Buffer
if err := template.Must(template.New("schema").Parse(periodCfg)).
Execute(&buf, map[string]interface{}{
"curPeriodStart": periodStart.String(),
"additionalPeriodStart": additionalPeriodStart.String(),
}); err != nil {
return nil, errors.New("error building schema_config")
}
merger.AddFragment(buf.Bytes())
}
for _, extra := range c.extraConfigs {
merger.AddFragment([]byte(extra))
}

@ -0,0 +1,61 @@
package cluster
var (
boltDBShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.curPeriodStart}}
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
`
additionalBoltDBShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.additionalPeriodStart}}
store: boltdb-shipper
object_store: store-1
schema: v11
index:
prefix: index_
period: 24h
`
tsdbShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.curPeriodStart}}
store: tsdb
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
`
additionalTSDBShipperSchemaConfigTemplate = `
schema_config:
configs:
- from: {{.additionalPeriodStart}}
store: tsdb
object_store: store-1
schema: v11
index:
prefix: index_
period: 24h
`
)
func WithAdditionalBoltDBPeriod(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalBoltDBShipperSchemaConfigTemplate, boltDBShipperSchemaConfigTemplate)
}
func WithAdditionalTSDBPeriod(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalTSDBShipperSchemaConfigTemplate, tsdbShipperSchemaConfigTemplate)
}
func WithBoltDBAndTSDBPeriods(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalBoltDBShipperSchemaConfigTemplate, tsdbShipperSchemaConfigTemplate)
}

@ -12,15 +12,15 @@ import (
"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
"github.com/grafana/loki/pkg/storage"
)
func TestMicroServicesDeleteRequest(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil)
defer func() {
assert.NoError(t, clu.Cleanup())
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
}()
// initially, run only compactor, index-gateway and distributor.
@ -187,7 +187,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 5)
// reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())
})
@ -218,7 +218,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
// Query lines
t.Run("verify query time filtering", func(t *testing.T) {
// reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())
// update expectedStreams as per the issued requests
@ -275,7 +275,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
require.NoError(t, tQuerier.SetTenantLimits(tenantID, tenantLimits))
// restart querier to make it sync the index
storage.ResetBoltDBIndexClientWithShipper()
storage.ResetBoltDBIndexClientsWithShipper()
require.NoError(t, tQuerier.Restart())
// ensure the deletion-mode limit is updated

@ -10,7 +10,7 @@ import (
"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/querylimits"
)
@ -88,18 +88,11 @@ func TestMicroServicesIngestQuery(t *testing.T) {
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
t.Run("ingest-logs-store", func(t *testing.T) {
t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
// TODO: Flushing is currently causing a panic, as the boltdb shipper is shared using a global variable in:
// https://github.com/grafana/loki/blob/66a4692423582ed17cce9bd86b69d55663dc7721/pkg/storage/factory.go#L32-L35
//require.NoError(t, cliIngester.Flush())
})
t.Run("ingest-logs-ingester", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
@ -139,3 +132,130 @@ func TestMicroServicesIngestQuery(t *testing.T) {
require.ErrorContains(t, err, "the query time range exceeds the limit (query length")
})
}
func TestMicroServicesMultipleBucketSingleProvider(t *testing.T) {
for name, opt := range map[string]func(c *cluster.Cluster){
"boltdb-index": cluster.WithAdditionalBoltDBPeriod,
"tsdb-index": cluster.WithAdditionalTSDBPeriod,
"boltdb-and-tsdb": cluster.WithBoltDBAndTSDBPeriods,
} {
t.Run(name, func(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil, opt)
defer func() {
storage.ResetBoltDBIndexClientsWithShipper()
assert.NoError(t, clu.Cleanup())
}()
// initially, run only compactor and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only ingester and query-scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
)
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
t.Run("ingest-logs", func(t *testing.T) {
// ingest logs to the previous period
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-36*time.Hour), map[string]string{"job": "fake"}))
// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
t.Run("query-lookback-default", func(t *testing.T) {
// queries ingesters with the default lookback period (3h)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineC", "lineD"}, lines)
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// restart querier and index shipper to sync the index
storage.ResetBoltDBIndexClientsWithShipper()
tQuerier.AddFlags("-querier.query-store-only=true")
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
})
}
}

@ -29,7 +29,7 @@ func (m *YAMLMerger) Merge() ([]byte, error) {
return nil, fmt.Errorf("failed to unmarshal given fragment %q to map: %w", fragment, err)
}
if err = mergo.Merge(&merged, fragmentMap, mergo.WithOverride, mergo.WithTypeCheck); err != nil {
if err = mergo.Merge(&merged, fragmentMap, mergo.WithOverride, mergo.WithTypeCheck, mergo.WithAppendSlice); err != nil {
return nil, fmt.Errorf("failed to merge fragment %q with base: %w", fragment, err)
}
}

@ -522,11 +522,6 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}
func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType == defaults.StorageConfig.BoltDBShipperConfig.SharedStoreType {
cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType = period.ObjectType
}
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = period.ObjectType
}
@ -545,11 +540,6 @@ func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper, period config.Per
}
func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper, period config.PeriodConfig) {
if cfg.StorageConfig.TSDBShipperConfig.SharedStoreType == defaults.StorageConfig.TSDBShipperConfig.SharedStoreType {
cfg.StorageConfig.TSDBShipperConfig.SharedStoreType = period.ObjectType
}
if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = period.ObjectType
}

@ -788,22 +788,6 @@ compactor:
})
t.Run("when using boltdb storage type", func(t *testing.T) {
t.Run("default storage_config.boltdb.shared_store to the value of current_schema.object_store", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:
configs:
- from: 2021-08-01
store: boltdb-shipper
object_store: s3
schema: v11
index:
prefix: index_
period: 24h`
cfg, _ := testContext(boltdbSchemaConfig, nil)
assert.Equal(t, config.StorageTypeS3, cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType)
})
t.Run("default compactor.shared_store to the value of current_schema.object_store", func(t *testing.T) {
const boltdbSchemaConfig = `---
schema_config:

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"hash/fnv"
"math"
"net/http"
"net/http/httputil"
"net/url"
@ -1112,15 +1113,32 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler {
func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
var indexClient indexgateway.IndexClient
if schemaHasBoltDBShipperConfig(t.Cfg.SchemaConfig) {
var err error
indexClient, err = storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant, prometheus.DefaultRegisterer)
var indexClients []indexgateway.IndexClientWithRange
for i, period := range t.Cfg.SchemaConfig.Configs {
if period.IndexType != config.BoltDBShipperType {
continue
}
periodEndTime := config.DayTime{Time: math.MaxInt64}
if i < len(t.Cfg.SchemaConfig.Configs)-1 {
periodEndTime = config.DayTime{Time: t.Cfg.SchemaConfig.Configs[i+1].From.Time.Add(-time.Millisecond)}
}
tableRange := period.GetIndexTableNumberRange(periodEndTime)
indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())),
)
if err != nil {
return nil, err
}
indexClients = append(indexClients, indexgateway.IndexClientWithRange{
IndexClient: indexClient,
TableRange: tableRange,
})
}
gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, t.Store, indexClient)
gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, t.Store, indexClients)
if err != nil {
return nil, err
}

@ -2,6 +2,7 @@ package loki
import (
"fmt"
"path"
"path/filepath"
"testing"
"time"
@ -369,8 +370,8 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
BoltDBShipperConfig: shipper.Config{
Config: indexshipper.Config{
SharedStoreType: config.StorageTypeFileSystem,
ActiveIndexDirectory: dir,
CacheLocation: dir,
ActiveIndexDirectory: path.Join(dir, "index"),
CacheLocation: path.Join(dir, "cache"),
Mode: indexshipper.ModeWriteOnly,
ResyncInterval: 24 * time.Hour,
},
@ -380,10 +381,13 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
cfg.SchemaConfig = config.SchemaConfig{
Configs: []config.PeriodConfig{
{
IndexType: config.StorageTypeInMemory,
IndexType: config.BoltDBShipperType,
ObjectType: config.StorageTypeFileSystem,
RowShards: 16,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Period: time.Hour * 24,
},
RowShards: 16,
Schema: "v11",
From: config.DayTime{
Time: model.Now(),
},
@ -397,6 +401,8 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
cfg.IndexGateway.Mode = indexgateway.SimpleMode
cfg.IndexGateway.Ring.InstanceAddr = localhost
cfg.CompactorConfig.CompactorRing.InstanceAddr = localhost
cfg.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem
cfg.CompactorConfig.WorkingDirectory = path.Join(dir, "compactor")
cfg.Ruler.Config.Ring.InstanceAddr = localhost
cfg.Ruler.Config.StoreConfig.Type = config.StorageTypeLocal

@ -4,7 +4,9 @@ import (
"errors"
"flag"
"fmt"
"math"
"os"
"regexp"
"sort"
"strconv"
"strings"
@ -52,6 +54,7 @@ const (
var (
errInvalidSchemaVersion = errors.New("invalid schema version")
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
errInvalidTableName = errors.New("invalid table name")
errConfigFileNotSet = errors.New("schema config file needs to be set")
errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting")
errSchemaIncreasingFromTime = errors.New("from time in schemas must be distinct and in increasing order")
@ -60,8 +63,27 @@ var (
errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h")
errTSDBNon24HoursIndexPeriod = errors.New("tsdb must always have periodic config for index set to 24h")
errZeroLengthConfig = errors.New("must specify at least one schema configuration")
// regexp for finding the trailing index table number at the end of the table name
extractTableNumberRegex = regexp.MustCompile(`[0-9]+$`)
)
// ExtractTableNumberFromName extracts the table number from a given tableName.
// returns -1 on error.
func ExtractTableNumberFromName(tableName string) (int64, error) {
match := extractTableNumberRegex.Find([]byte(tableName))
if match == nil {
return -1, errInvalidTableName
}
tableNumber, err := strconv.ParseInt(string(match), 10, 64)
if err != nil {
return -1, err
}
return tableNumber, nil
}
// TableRange represents a range of table numbers built based on the configured schema start/end date and the table period.
// Both Start and End are inclusive.
type TableRange struct {
@ -73,21 +95,52 @@ type TableRange struct {
type TableRanges []TableRange
// TableInRange tells whether given table falls in any of the ranges and the tableName has the right prefix based on the schema config.
func (t TableRanges) TableInRange(tableNumber int64, tableName string) bool {
func (t TableRanges) TableInRange(tableName string) (bool, error) {
tableNumber, err := ExtractTableNumberFromName(tableName)
if err != nil {
return false, err
}
cfg := t.ConfigForTableNumber(tableNumber)
return cfg != nil && fmt.Sprintf("%s%s", cfg.IndexTables.Prefix, strconv.Itoa(int(tableNumber))) == tableName
return cfg != nil &&
fmt.Sprintf("%s%s", cfg.IndexTables.Prefix, strconv.Itoa(int(tableNumber))) == tableName, nil
}
func (t TableRanges) ConfigForTableNumber(tableNumber int64) *PeriodConfig {
for _, r := range t {
if r.Start <= tableNumber && tableNumber <= r.End {
return r.PeriodConfig
if cfg := r.ConfigForTableNumber(tableNumber); cfg != nil {
return cfg
}
}
return nil
}
// TableInRange tells whether given table falls in the range and the tableName has the right prefix based on the schema config.
func (t TableRange) TableInRange(tableName string) (bool, error) {
// non-periodic tables
if t.PeriodConfig.IndexTables.Period == 0 {
return t.PeriodConfig.IndexTables.Prefix == tableName, nil
}
tableNumber, err := ExtractTableNumberFromName(tableName)
if err != nil {
return false, err
}
cfg := t.ConfigForTableNumber(tableNumber)
return cfg != nil &&
fmt.Sprintf("%s%s", cfg.IndexTables.Prefix, strconv.Itoa(int(tableNumber))) == tableName, nil
}
func (t TableRange) ConfigForTableNumber(tableNumber int64) *PeriodConfig {
if t.Start <= tableNumber && tableNumber <= t.End {
return t.PeriodConfig
}
return nil
}
// PeriodConfig defines the schema and tables to use for a period of time
type PeriodConfig struct {
// used when working with config
@ -121,6 +174,13 @@ func (cfg *PeriodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
// GetIndexTableNumberRange returns the table number range calculated based on
// the configured schema start date, index table period and the given schemaEndDate
func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRange {
// non-periodic tables
if cfg.IndexTables.Period == 0 {
return TableRange{
PeriodConfig: cfg,
}
}
return TableRange{
Start: cfg.From.Unix() / int64(cfg.IndexTables.Period/time.Second),
End: schemaEndDate.Unix() / int64(cfg.IndexTables.Period/time.Second),
@ -550,3 +610,21 @@ func newExternalKey(ref logproto.ChunkRef) string {
func newerExternalKey(ref logproto.ChunkRef) string {
return fmt.Sprintf("%s/%x/%x:%x:%x", ref.UserID, ref.Fingerprint, int64(ref.From), int64(ref.Through), ref.Checksum)
}
func GetIndexStoreTableRanges(indexType string, periodicConfigs []PeriodConfig) TableRanges {
var ranges TableRanges
for i := range periodicConfigs {
if periodicConfigs[i].IndexType != indexType {
continue
}
periodEndTime := DayTime{Time: math.MaxInt64}
if i < len(periodicConfigs)-1 {
periodEndTime = DayTime{Time: periodicConfigs[i+1].From.Time.Add(-time.Millisecond)}
}
ranges = append(ranges, periodicConfigs[i].GetIndexTableNumberRange(periodEndTime))
}
return ranges
}

@ -2,6 +2,7 @@ package config
import (
"fmt"
"math"
"testing"
"time"
@ -761,6 +762,7 @@ func TestTableRanges_TableInRange(t *testing.T) {
End: 10,
PeriodConfig: &PeriodConfig{IndexTables: PeriodicTableConfig{
Prefix: "index_",
Period: 24 * time.Hour,
}},
},
TableRange{
@ -768,36 +770,217 @@ func TestTableRanges_TableInRange(t *testing.T) {
End: 20,
PeriodConfig: &PeriodConfig{IndexTables: PeriodicTableConfig{
Prefix: "index_foo_",
Period: 24 * time.Hour,
}},
},
}
for i, tc := range []struct {
for _, tc := range []struct {
tableNumber int64
tableName string
expResp bool
expError error
}{
{
tableNumber: 1,
tableName: "index_1",
expResp: true,
tableName: "index_1",
expResp: true,
},
{
tableName: "index_foo_15",
expResp: true,
},
// wrong prefix
{
tableName: "index_foo_5",
},
// wrong prefix
{
tableName: "index_15",
},
// invalid table name
{
tableName: "index_foo",
expError: errInvalidTableName,
},
} {
t.Run(fmt.Sprintf("table %s", tc.tableName), func(t *testing.T) {
ok, err := tableRanges.TableInRange(tc.tableName)
require.Equal(t, tc.expResp, ok)
if tc.expError != nil {
require.ErrorIs(t, err, tc.expError)
} else {
require.NoError(t, err)
}
})
}
}
func TestTableRange_TableInRange(t *testing.T) {
tableRange := TableRange{
Start: 1,
End: 10,
PeriodConfig: &PeriodConfig{IndexTables: PeriodicTableConfig{
Prefix: "index_",
Period: 24 * time.Hour,
}},
}
for _, tc := range []struct {
tableName string
expResp bool
expError error
}{
{
tableName: "index_1",
expResp: true,
},
// out of range
{
tableName: "index_12",
},
// wrong prefix
{
tableName: "index_foo_5",
},
// invalid table name
{
tableName: "index_foo",
expError: errInvalidTableName,
},
} {
t.Run(fmt.Sprintf("periodic table %s", tc.tableName), func(t *testing.T) {
ok, err := tableRange.TableInRange(tc.tableName)
require.Equal(t, tc.expResp, ok)
if tc.expError != nil {
require.ErrorIs(t, err, tc.expError)
} else {
require.NoError(t, err)
}
})
}
nonPeriodicTableRange := TableRange{
PeriodConfig: &PeriodConfig{IndexTables: PeriodicTableConfig{
Prefix: "index",
}},
}
for _, tc := range []struct {
tableName string
expResp bool
}{
{
tableNumber: 15,
tableName: "index_foo_15",
expResp: true,
tableName: "index",
expResp: true,
},
{
tableNumber: 25,
tableName: "index_15",
tableName: "index_foo",
},
{
tableNumber: 15,
tableName: "index_15",
tableName: "index_0",
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
require.Equal(t, tc.expResp, tableRanges.TableInRange(tc.tableNumber, tc.tableName))
t.Run(fmt.Sprintf("non-periodic table %s", tc.tableName), func(t *testing.T) {
ok, err := nonPeriodicTableRange.TableInRange(tc.tableName)
require.Equal(t, tc.expResp, ok)
require.NoError(t, err)
})
}
}
func TestGetIndexStoreTableRanges(t *testing.T) {
now := model.Now()
schemaConfig := SchemaConfig{
Configs: []PeriodConfig{
{
From: DayTime{Time: now.Add(30 * 24 * time.Hour)},
IndexType: BoltDBShipperType,
ObjectType: StorageTypeFileSystem,
Schema: "v9",
IndexTables: PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
{
From: DayTime{Time: now.Add(20 * 24 * time.Hour)},
IndexType: BoltDBShipperType,
ObjectType: StorageTypeFileSystem,
Schema: "v11",
IndexTables: PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: DayTime{Time: now.Add(15 * 24 * time.Hour)},
IndexType: TSDBType,
ObjectType: StorageTypeFileSystem,
Schema: "v11",
IndexTables: PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: DayTime{Time: now.Add(10 * 24 * time.Hour)},
IndexType: StorageTypeBigTable,
ObjectType: StorageTypeFileSystem,
Schema: "v11",
IndexTables: PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: DayTime{Time: now.Add(5 * 24 * time.Hour)},
IndexType: TSDBType,
ObjectType: StorageTypeFileSystem,
Schema: "v11",
IndexTables: PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
},
}
require.Equal(t, TableRanges{
{
Start: schemaConfig.Configs[0].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[1].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[0],
},
{
Start: schemaConfig.Configs[1].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[2].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[1],
},
}, GetIndexStoreTableRanges(BoltDBShipperType, schemaConfig.Configs))
require.Equal(t, TableRanges{
{
Start: schemaConfig.Configs[3].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[4].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[3],
},
}, GetIndexStoreTableRanges(StorageTypeBigTable, schemaConfig.Configs))
require.Equal(t, TableRanges{
{
Start: schemaConfig.Configs[2].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[3].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[2],
},
{
Start: schemaConfig.Configs[4].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: model.Time(math.MaxInt64).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[4],
},
}, GetIndexStoreTableRanges(TSDBType, schemaConfig.Configs))
}

@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -36,20 +37,24 @@ import (
)
var (
// BoltDB Shipper is supposed to be run as a singleton.
// This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used
// in tests for creating multiple instances of it at a time.
boltDBIndexClientWithShipper index.Client
indexGatewayClient index.Client
// singleton for each period
boltdbIndexClientsWithShipper = make(map[config.DayTime]index.Client)
)
// ResetBoltDBIndexClientWithShipper allows to reset the singleton.
// ResetBoltDBIndexClientsWithShipper allows to reset the singletons.
// MUST ONLY BE USED IN TESTS
func ResetBoltDBIndexClientWithShipper() {
if boltDBIndexClientWithShipper == nil {
return
func ResetBoltDBIndexClientsWithShipper() {
for _, client := range boltdbIndexClientsWithShipper {
client.Stop()
}
boltdbIndexClientsWithShipper = make(map[config.DayTime]index.Client)
if indexGatewayClient != nil {
indexGatewayClient.Stop()
indexGatewayClient = nil
}
boltDBIndexClientWithShipper.Stop()
boltDBIndexClientWithShipper = nil
}
// StoreLimits helps get Limits specific to Queries for Stores
@ -252,8 +257,8 @@ func (cfg *Config) Validate() error {
}
// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) (index.Client, error) {
switch name {
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, logger log.Logger) (index.Client, error) {
switch periodCfg.IndexType {
case config.StorageTypeInMemory:
store := testutils.NewMockStorage()
return store, nil
@ -280,33 +285,44 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi
case config.StorageTypeGrpc:
return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg)
case config.BoltDBShipperType:
if boltDBIndexClientWithShipper != nil {
return boltDBIndexClientWithShipper, nil
}
if shouldUseIndexGatewayClient(cfg.BoltDBShipperConfig.Config) {
gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, util_log.Logger)
if indexGatewayClient != nil {
return indexGatewayClient, nil
}
gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, logger)
if err != nil {
return nil, err
}
boltDBIndexClientWithShipper = gateway
indexGatewayClient = gateway
return gateway, nil
}
objectClient, err := NewObjectClient(cfg.BoltDBShipperConfig.SharedStoreType, cfg, cm)
if client, ok := boltdbIndexClientsWithShipper[periodCfg.From]; ok {
return client, nil
}
objectType := periodCfg.ObjectType
if cfg.BoltDBShipperConfig.SharedStoreType != "" {
objectType = cfg.BoltDBShipperConfig.SharedStoreType
}
objectClient, err := NewObjectClient(objectType, cfg, cm)
if err != nil {
return nil, err
}
tableRanges := getIndexStoreTableRanges(config.BoltDBShipperType, schemaCfg.Configs)
boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits,
ownsTenantFn, tableRanges, registerer)
shipper, err := shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits,
ownsTenantFn, tableRange, registerer, logger)
if err != nil {
return nil, err
}
return boltDBIndexClientWithShipper, err
boltdbIndexClientsWithShipper[periodCfg.From] = shipper
return shipper, nil
default:
return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %v, %v, %v, %v, %v, %v", name, config.StorageTypeAWS, config.StorageTypeCassandra, config.StorageTypeInMemory, config.StorageTypeGCP, config.StorageTypeBigTable, config.StorageTypeBigTableHashed)
return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %v, %v, %v, %v, %v, %v", periodCfg.IndexType, config.StorageTypeAWS, config.StorageTypeCassandra, config.StorageTypeInMemory, config.StorageTypeGCP, config.StorageTypeBigTable, config.StorageTypeBigTableHashed)
}
}

@ -114,8 +114,7 @@ func TestNamedStores(t *testing.T) {
},
BoltDBShipperConfig: boltdbShipperConfig,
}
err := cfg.NamedStores.validate()
require.NoError(t, err)
require.NoError(t, cfg.NamedStores.validate())
schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{

@ -76,10 +76,6 @@ type store struct {
logger log.Logger
chunkFilterer chunk.RequestChunkFilterer
// Keep a reference to the tsdb index store as we use one store for multiple schema period configs.
tsdbStore index.ReaderWriter
tsdbStoreStopFunc func()
}
// NewStore creates a new Loki Store using configuration supplied.
@ -155,7 +151,7 @@ func NewStore(cfg Config, storeCfg config.ChunkStoreConfig, schemaCfg config.Sch
}
func (s *store) init() error {
for _, p := range s.schemaCfg.Configs {
for i, p := range s.schemaCfg.Configs {
chunkClient, err := s.chunkClientForPeriod(p)
if err != nil {
return err
@ -165,10 +161,15 @@ func (s *store) init() error {
return err
}
w, idx, stop, err := s.storeForPeriod(p, chunkClient, f)
periodEndTime := config.DayTime{Time: math.MaxInt64}
if i < len(s.schemaCfg.Configs)-1 {
periodEndTime = config.DayTime{Time: s.schemaCfg.Configs[i+1].From.Time.Add(-time.Millisecond)}
}
w, idx, stop, err := s.storeForPeriod(p, p.GetIndexTableNumberRange(periodEndTime), chunkClient, f)
if err != nil {
return err
}
s.composite.AddStore(p.From.Time, f, idx, w, stop)
}
@ -208,7 +209,7 @@ func shouldUseIndexGatewayClient(cfg indexshipper.Config) bool {
return true
}
func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, index.ReaderWriter, func(), error) {
func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRange, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, index.ReaderWriter, func(), error) {
indexClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{
"component": fmt.Sprintf(
@ -217,11 +218,12 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
p.From.String(),
),
}, s.registerer)
indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String()))
if p.IndexType == config.TSDBType {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, indexClientLogger)
if err != nil {
return nil, nil, nil, err
}
@ -233,7 +235,12 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
}, nil
}
objectClient, err := NewObjectClient(s.cfg.TSDBShipperConfig.SharedStoreType, s.cfg, s.clientMetrics)
objectType := p.ObjectType
if s.cfg.TSDBShipperConfig.SharedStoreType != "" {
objectType = s.cfg.TSDBShipperConfig.SharedStoreType
}
objectClient, err := NewObjectClient(objectType, s.cfg, s.clientMetrics)
if err != nil {
return nil, nil, nil, err
}
@ -244,41 +251,40 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
pCopy := p
pCopy.IndexType = config.BoltDBShipperType
pCopy.IndexTables.Prefix = fmt.Sprintf("%sbackup_", pCopy.IndexTables.Prefix)
_, backupIndexWriter, backupStoreStop, err = s.storeForPeriod(pCopy, chunkClient, f)
tableRange := tableRange
tableRange.PeriodConfig = &pCopy
_, backupIndexWriter, backupStoreStop, err = s.storeForPeriod(pCopy, tableRange, chunkClient, f)
if err != nil {
return nil, nil, nil, err
}
}
// We should only create one tsdb.Store per storage.Store and reuse it over all TSDB schema periods.
if s.tsdbStore == nil {
indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits,
getIndexStoreTableRanges(config.TSDBType, s.schemaCfg.Configs), backupIndexWriter, indexClientReg)
if err != nil {
return nil, nil, nil, err
}
s.tsdbStore = indexReaderWriter
s.tsdbStoreStopFunc = stopTSDBStoreFunc
indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(fmt.Sprintf("%s_%s", p.ObjectType, p.From.String()), s.cfg.TSDBShipperConfig, s.schemaCfg, f, objectClient, s.limits,
tableRange, backupIndexWriter, indexClientReg, indexClientLogger)
if err != nil {
return nil, nil, nil, err
}
indexReaderWriter := index.NewMonitoredReaderWriter(s.tsdbStore, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, s.tsdbStore, s.storeCfg.DisableIndexDeduplication)
indexReaderWriter = index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, indexReaderWriter, s.storeCfg.DisableIndexDeduplication)
return chunkWriter, indexReaderWriter,
func() {
f.Stop()
chunkClient.Stop()
s.tsdbStoreStopFunc()
stopTSDBStoreFunc()
objectClient.Stop()
backupStoreStop()
}, nil
}
idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg)
idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, indexClientLogger)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error creating index client")
}
idx = series_index.NewCachingIndexClient(idx, s.indexReadCache, s.cfg.IndexCacheValidity, s.limits, s.logger, s.cfg.DisableBroadIndexQueries)
idx = series_index.NewCachingIndexClient(idx, s.indexReadCache, s.cfg.IndexCacheValidity, s.limits, indexClientLogger, s.cfg.DisableBroadIndexQueries)
schema, err := series_index.CreateSchema(p)
if err != nil {
return nil, nil, nil, err
@ -543,21 +549,3 @@ func (f failingChunkWriter) Put(_ context.Context, _ []chunk.Chunk) error {
func (f failingChunkWriter) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return errWritingChunkUnsupported
}
func getIndexStoreTableRanges(indexType string, periodicConfigs []config.PeriodConfig) config.TableRanges {
var ranges config.TableRanges
for i := range periodicConfigs {
if periodicConfigs[i].IndexType != indexType {
continue
}
periodEndTime := config.DayTime{Time: math.MaxInt64}
if i < len(periodicConfigs)-1 {
periodEndTime = config.DayTime{Time: periodicConfigs[i+1].From.Time.Add(-time.Millisecond)}
}
ranges = append(ranges, periodicConfigs[i].GetIndexTableNumberRange(periodEndTime))
}
return ranges
}

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log"
"math"
"net/http"
_ "net/http/pprof"
"os"
@ -995,114 +994,130 @@ type timeRange struct {
from, to time.Time
}
func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
tempDir := t.TempDir()
func TestStore_MultiPeriod(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)
// config for BoltDB Shipper
boltdbShipperConfig := shipper.Config{}
flagext.DefaultValues(&boltdbShipperConfig)
boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
boltdbShipperConfig.SharedStoreType = config.StorageTypeFileSystem
boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache")
boltdbShipperConfig.Mode = indexshipper.ModeReadWrite
// dates for activation of boltdb shippers
firstStoreDate := parseDate("2019-01-01")
secondStoreDate := parseDate("2019-01-02")
cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
BoltDBShipperConfig: boltdbShipperConfig,
}
schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: timeToModelTime(firstStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: config.StorageTypeFileSystem,
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
for name, indexes := range map[string][]string{
"botldb_boltdb": {config.BoltDBShipperType, config.BoltDBShipperType},
"botldb_tsdb": {config.BoltDBShipperType, config.TSDBType},
"tsdb_tsdb": {config.TSDBType, config.TSDBType},
} {
t.Run(name, func(t *testing.T) {
tempDir := t.TempDir()
shipperConfig := indexshipper.Config{}
flagext.DefaultValues(&shipperConfig)
shipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
shipperConfig.CacheLocation = path.Join(tempDir, "cache")
shipperConfig.Mode = indexshipper.ModeReadWrite
cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
BoltDBShipperConfig: shipper.Config{
Config: shipperConfig,
},
},
{
From: config.DayTime{Time: timeToModelTime(secondStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
TSDBShipperConfig: shipperConfig,
NamedStores: NamedStores{
Filesystem: map[string]local.FSConfig{
"named-store": {Directory: path.Join(tempDir, "named-store")},
},
},
RowShards: 2,
},
},
}
}
require.NoError(t, cfg.NamedStores.validate())
ResetBoltDBIndexClientWithShipper()
store, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: timeToModelTime(firstStoreDate)},
IndexType: indexes[0],
ObjectType: config.StorageTypeFileSystem,
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
{
From: config.DayTime{Time: timeToModelTime(secondStoreDate)},
IndexType: indexes[1],
ObjectType: "named-store",
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
},
}
// time ranges adding a chunk for each store and a chunk which overlaps both the stores
chunksToBuildForTimeRanges := []timeRange{
{
// chunk just for first store
secondStoreDate.Add(-3 * time.Hour),
secondStoreDate.Add(-2 * time.Hour),
},
{
// chunk overlapping both the stores
secondStoreDate.Add(-time.Hour),
secondStoreDate.Add(time.Hour),
},
{
// chunk just for second store
secondStoreDate.Add(2 * time.Hour),
secondStoreDate.Add(3 * time.Hour),
},
}
ResetBoltDBIndexClientsWithShipper()
store, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
// build and add chunks to the store
addedChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
// time ranges adding a chunk for each store and a chunk which overlaps both the stores
chunksToBuildForTimeRanges := []timeRange{
{
// chunk just for first store
secondStoreDate.Add(-3 * time.Hour),
secondStoreDate.Add(-2 * time.Hour),
},
{
// chunk overlapping both the stores
secondStoreDate.Add(-time.Hour),
secondStoreDate.Add(time.Hour),
},
{
// chunk just for second store
secondStoreDate.Add(2 * time.Hour),
secondStoreDate.Add(3 * time.Hour),
},
}
err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
// build and add chunks to the store
addedChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
addedChunkIDs[schemaConfig.ExternalKey(chk.ChunkRef)] = struct{}{}
}
err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
// recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup.
store.Stop()
addedChunkIDs[schemaConfig.ExternalKey(chk.ChunkRef)] = struct{}{}
}
store, err = NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
// recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup.
store.Stop()
defer store.Stop()
ResetBoltDBIndexClientsWithShipper()
store, err = NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
// get all the chunks from both the stores
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
totalChunks += len(chks)
}
// we get common chunk twice because it is indexed in both the stores
require.Equal(t, totalChunks, len(addedChunkIDs)+1)
defer store.Stop()
// check whether we got back all the chunks which were added
for i := range chunks {
for _, c := range chunks[i] {
_, ok := addedChunkIDs[schemaConfig.ExternalKey(c.ChunkRef)]
require.True(t, ok)
}
// get all the chunks from both the stores
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
totalChunks += len(chks)
}
// we get common chunk twice because it is indexed in both the stores
require.Equal(t, totalChunks, len(addedChunkIDs)+1)
// check whether we got back all the chunks which were added
for i := range chunks {
for _, c := range chunks[i] {
_, ok := addedChunkIDs[schemaConfig.ExternalKey(c.ChunkRef)]
require.True(t, ok)
}
}
})
}
}
func mustParseLabels(s string) map[string]string {
@ -1286,102 +1301,6 @@ func Test_GetSeries(t *testing.T) {
}
}
func TestGetIndexStoreTableRanges(t *testing.T) {
now := model.Now()
schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: now.Add(30 * 24 * time.Hour)},
IndexType: config.BoltDBShipperType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
{
From: config.DayTime{Time: now.Add(20 * 24 * time.Hour)},
IndexType: config.BoltDBShipperType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: config.DayTime{Time: now.Add(15 * 24 * time.Hour)},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: config.DayTime{Time: now.Add(10 * 24 * time.Hour)},
IndexType: config.StorageTypeBigTable,
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: config.DayTime{Time: now.Add(5 * 24 * time.Hour)},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
},
}
require.Equal(t, config.TableRanges{
{
Start: schemaConfig.Configs[0].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[1].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[0],
},
{
Start: schemaConfig.Configs[1].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[2].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[1],
},
}, getIndexStoreTableRanges(config.BoltDBShipperType, schemaConfig.Configs))
require.Equal(t, config.TableRanges{
{
Start: schemaConfig.Configs[3].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[4].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[3],
},
}, getIndexStoreTableRanges(config.StorageTypeBigTable, schemaConfig.Configs))
require.Equal(t, config.TableRanges{
{
Start: schemaConfig.Configs[2].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[3].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[2],
},
{
Start: schemaConfig.Configs[4].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: model.Time(math.MaxInt64).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
PeriodConfig: &schemaConfig.Configs[4],
},
}, getIndexStoreTableRanges(config.TSDBType, schemaConfig.Configs))
}
func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
tempDir := t.TempDir()
@ -1443,7 +1362,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
},
}
ResetBoltDBIndexClientWithShipper()
ResetBoltDBIndexClientsWithShipper()
store, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)

@ -5,20 +5,19 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
@ -27,9 +26,6 @@ const (
daySeconds = int64(24 * time.Hour / time.Second)
)
// regexp for finding the trailing index bucket number at the end of table name
var extractTableNumberRegex = regexp.MustCompile(`[0-9]+$`)
type Limits interface {
AllByUserID() map[string]*validation.Limits
DefaultLimits() *validation.Limits
@ -55,14 +51,15 @@ type Config struct {
}
type tableManager struct {
cfg Config
openIndexFileFunc index.OpenIndexFileFunc
indexStorageClient storage.Client
tableRangesToHandle config.TableRanges
cfg Config
openIndexFileFunc index.OpenIndexFileFunc
indexStorageClient storage.Client
tableRangeToHandle config.TableRange
tables map[string]Table
tablesMtx sync.RWMutex
metrics *metrics
logger log.Logger
ctx context.Context
cancel context.CancelFunc
@ -72,22 +69,23 @@ type tableManager struct {
}
func NewTableManager(cfg Config, openIndexFileFunc index.OpenIndexFileFunc, indexStorageClient storage.Client,
ownsTenantFn IndexGatewayOwnsTenant, tableRangesToHandle config.TableRanges, reg prometheus.Registerer) (TableManager, error) {
ownsTenantFn IndexGatewayOwnsTenant, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (TableManager, error) {
if err := util.EnsureDirectory(cfg.CacheDir); err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
tm := &tableManager{
cfg: cfg,
openIndexFileFunc: openIndexFileFunc,
indexStorageClient: indexStorageClient,
tableRangesToHandle: tableRangesToHandle,
ownsTenant: ownsTenantFn,
tables: make(map[string]Table),
metrics: newMetrics(reg),
ctx: ctx,
cancel: cancel,
cfg: cfg,
openIndexFileFunc: openIndexFileFunc,
indexStorageClient: indexStorageClient,
tableRangeToHandle: tableRangeToHandle,
ownsTenant: ownsTenantFn,
tables: make(map[string]Table),
metrics: newMetrics(reg),
logger: logger,
ctx: ctx,
cancel: cancel,
}
// load the existing tables first.
@ -125,18 +123,18 @@ func (tm *tableManager) loop() {
case <-syncTicker.C:
err := tm.syncTables(tm.ctx)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err)
level.Error(tm.logger).Log("msg", "error syncing local boltdb files with storage", "err", err)
}
// we need to keep ensuring query readiness to download every days new table which would otherwise be downloaded only during queries.
err = tm.ensureQueryReadiness(tm.ctx)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error ensuring query readiness of tables", "err", err)
level.Error(tm.logger).Log("msg", "error ensuring query readiness of tables", "err", err)
}
case <-cacheCleanupTicker.C:
err := tm.cleanupCache()
if err != nil {
level.Error(util_log.Logger).Log("msg", "error cleaning up expired tables", "err", err)
level.Error(tm.logger).Log("msg", "error cleaning up expired tables", "err", err)
}
case <-tm.ctx.Done():
return
@ -187,7 +185,7 @@ func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) {
table, ok = tm.tables[tableName]
if !ok {
// table not found, creating one.
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName))
level.Info(tm.logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName))
tablePath := filepath.Join(tm.cfg.CacheDir, tableName)
err := util.EnsureDirectory(tablePath)
@ -220,7 +218,7 @@ func (tm *tableManager) syncTables(ctx context.Context) error {
tm.metrics.tablesDownloadOperationDurationSeconds.Set(time.Since(start).Seconds())
}()
level.Info(util_log.Logger).Log("msg", "syncing tables")
level.Info(tm.logger).Log("msg", "syncing tables")
for _, table := range tm.tables {
err := table.Sync(ctx)
@ -236,10 +234,10 @@ func (tm *tableManager) cleanupCache() error {
tm.tablesMtx.Lock()
defer tm.tablesMtx.Unlock()
level.Info(util_log.Logger).Log("msg", "cleaning tables cache")
level.Info(tm.logger).Log("msg", "cleaning tables cache")
for name, table := range tm.tables {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("cleaning up expired table %s", name))
level.Info(tm.logger).Log("msg", fmt.Sprintf("cleaning up expired table %s", name))
isEmpty, err := table.DropUnusedIndex(tm.cfg.CacheTTL, time.Now())
if err != nil {
return err
@ -259,7 +257,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
distinctUsers := make(map[string]struct{})
defer func() {
level.Info(util_log.Logger).Log("msg", "query readiness setup completed", "duration", time.Since(start), "distinct_users_len", len(distinctUsers))
level.Info(tm.logger).Log("msg", "query readiness setup completed", "duration", time.Since(start), "distinct_users_len", len(distinctUsers))
}()
activeTableNumber := getActiveTableNumber()
@ -291,15 +289,25 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
}
for _, tableName := range tables {
tableNumber, err := extractTableNumberFromName(tableName)
if err != nil {
return err
if tableName == deletion.DeleteRequestsTableName {
continue
}
if tableNumber == -1 || !tm.tableRangesToHandle.TableInRange(tableNumber, tableName) {
if ok, err := tm.tableRangeToHandle.TableInRange(tableName); !ok {
if err != nil {
level.Error(tm.logger).Log("msg", "failed to run query readiness for table", "table-name", tableName, "err", err)
} else {
level.Debug(tm.logger).Log("msg", "skipping query readiness. table not in range", "table-name", tableName)
}
continue
}
tableNumber, err := config.ExtractTableNumberFromName(tableName)
if err != nil {
return fmt.Errorf("cannot extract table number from %s: %w", tableName, err)
}
// continue if the table is not within query readiness
if activeTableNumber-tableNumber > int64(largestQueryReadinessNum) {
continue
@ -338,7 +346,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
}
ensureQueryReadinessDuration := time.Since(operationStart)
level.Info(util_log.Logger).Log(
level.Info(tm.logger).Log(
"msg", "index pre-download for query readiness completed",
"users_len", len(usersToBeQueryReadyFor),
"query_readiness_duration", ensureQueryReadinessDuration,
@ -393,15 +401,17 @@ func (tm *tableManager) loadLocalTables() error {
continue
}
tableNumber, err := extractTableNumberFromName(entry.Name())
if err != nil {
return err
}
if tableNumber == -1 || !tm.tableRangesToHandle.TableInRange(tableNumber, entry.Name()) {
if ok, err := tm.tableRangeToHandle.TableInRange(entry.Name()); !ok {
if err != nil {
level.Error(tm.logger).Log("msg", "failed to load table", "table-name", entry.Name(), "err", err)
} else {
level.Debug(tm.logger).Log("msg", "skip loading table as it is not in range", "table-name", entry.Name())
}
continue
}
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("loading local table %s", entry.Name()))
level.Info(tm.logger).Log("msg", fmt.Sprintf("loading local table %s", entry.Name()))
table, err := LoadTable(entry.Name(), filepath.Join(tm.cfg.CacheDir, entry.Name()),
tm.indexStorageClient, tm.openIndexFileFunc, tm.metrics)
@ -415,21 +425,6 @@ func (tm *tableManager) loadLocalTables() error {
return nil
}
// extractTableNumberFromName extract the table number from a given tableName.
// if the tableName doesn't match the regex, it would return -1 as table number.
func extractTableNumberFromName(tableName string) (int64, error) {
match := extractTableNumberRegex.Find([]byte(tableName))
if match == nil {
return -1, nil
}
tableNumber, err := strconv.ParseInt(string(match), 10, 64)
if err != nil {
return -1, err
}
return tableNumber, nil
}
func getActiveTableNumber() int64 {
return getTableNumberForTime(model.Now())
}

@ -8,6 +8,7 @@ import (
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
@ -21,6 +22,7 @@ const (
objectsStorageDirName = "objects"
cacheDirName = "cache"
indexTablePrefix = "table_"
indexTablePeriod = 24 * time.Hour
)
func buildTestStorageClient(t *testing.T, path string) storage.Client {
@ -33,7 +35,7 @@ func buildTestStorageClient(t *testing.T, path string) storage.Client {
type stopFunc func()
func buildTestTableManager(t *testing.T, path string, tableRangesToHandle config.TableRanges) (*tableManager, stopFunc) {
func buildTestTableManager(t *testing.T, path string, tableRangeToHandle *config.TableRange) (*tableManager, stopFunc) {
indexStorageClient := buildTestStorageClient(t, path)
cachePath := filepath.Join(path, cacheDirName)
@ -44,20 +46,21 @@ func buildTestTableManager(t *testing.T, path string, tableRangesToHandle config
Limits: &mockLimits{},
}
if tableRangesToHandle == nil {
tableRangesToHandle = config.TableRanges{
{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: indexTablePrefix},
if tableRangeToHandle == nil {
tableRangeToHandle = &config.TableRange{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
Period: indexTablePeriod,
},
},
}
}
tblManager, err := NewTableManager(cfg, func(s string) (index.Index, error) {
return openMockIndexFile(t, s), nil
}, indexStorageClient, nil, tableRangesToHandle, nil)
}, indexStorageClient, nil, *tableRangeToHandle, nil, log.NewNopLogger())
require.NoError(t, err)
return tblManager.(*tableManager), func() {
@ -138,11 +141,12 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) {
cfg: cfg,
indexStorageClient: mockIndexStorageClient,
tables: make(map[string]Table),
tableRangesToHandle: config.TableRanges{{
tableRangeToHandle: config.TableRange{
Start: 0, End: math.MaxInt64, PeriodConfig: &config.PeriodConfig{},
}},
},
ctx: context.Background(),
cancel: func() {},
logger: log.NewNopLogger(),
}
// setup 10 tables with 5 latest tables having user index for user1 and user2
@ -166,7 +170,7 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) {
name string
queryReadyNumDaysCfg int
queryReadinessLimits mockLimits
tableRangesToHandle config.TableRanges
tableRangeToHandle *config.TableRange
expectedQueryReadinessDoneForUsers map[string][]string
}{
@ -270,29 +274,19 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) {
{
name: "common index: 20 days",
queryReadyNumDaysCfg: 20,
tableRangesToHandle: config.TableRanges{
{
End: buildTableNumber(0),
Start: buildTableNumber(4),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: indexTablePrefix},
},
},
{
End: buildTableNumber(7),
Start: buildTableNumber(9),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: indexTablePrefix},
tableRangeToHandle: &config.TableRange{
End: buildTableNumber(5),
Start: buildTableNumber(9),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
Period: indexTablePeriod,
},
},
},
expectedQueryReadinessDoneForUsers: map[string][]string{
buildTableName(0): {},
buildTableName(1): {},
buildTableName(2): {},
buildTableName(3): {},
buildTableName(4): {},
buildTableName(5): {},
buildTableName(6): {},
buildTableName(7): {},
buildTableName(8): {},
buildTableName(9): {},
@ -304,27 +298,20 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) {
queryReadyIndexNumDaysDefault: 2,
},
queryReadyNumDaysCfg: 5,
tableRangesToHandle: config.TableRanges{
{
End: buildTableNumber(0),
Start: buildTableNumber(1),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: indexTablePrefix},
},
},
{
End: buildTableNumber(4),
Start: buildTableNumber(5),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: indexTablePrefix},
tableRangeToHandle: &config.TableRange{
End: buildTableNumber(2),
Start: buildTableNumber(4),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
Period: indexTablePeriod,
},
},
},
expectedQueryReadinessDoneForUsers: map[string][]string{
buildTableName(0): {"user1", "user2"},
buildTableName(1): {"user1", "user2"},
buildTableName(2): {"user1", "user2"},
buildTableName(3): {},
buildTableName(4): {},
buildTableName(5): {},
},
},
} {
@ -333,14 +320,17 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) {
resetTables()
tableManager.cfg.QueryReadyNumDays = tc.queryReadyNumDaysCfg
tableManager.cfg.Limits = &tc.queryReadinessLimits
if tc.tableRangesToHandle == nil {
tableManager.tableRangesToHandle = config.TableRanges{{
if tc.tableRangeToHandle == nil {
tableManager.tableRangeToHandle = config.TableRange{
Start: 0, End: math.MaxInt64, PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: indexTablePrefix},
IndexTables: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
Period: indexTablePeriod,
},
},
}}
}
} else {
tableManager.tableRangesToHandle = tc.tableRangesToHandle
tableManager.tableRangeToHandle = *tc.tableRangeToHandle
}
require.NoError(t, tableManager.ensureQueryReadiness(context.Background()))
@ -388,32 +378,22 @@ func TestTableManager_loadTables(t *testing.T) {
stopFunc()
tableManager, stopFunc = buildTestTableManager(t, tempDir, config.TableRanges{
{
End: buildTableNumber(0),
Start: buildTableNumber(1),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
},
},
},
{
End: buildTableNumber(5),
Start: buildTableNumber(8),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
},
tableManager, stopFunc = buildTestTableManager(t, tempDir, &config.TableRange{
End: buildTableNumber(4),
Start: buildTableNumber(8),
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
Period: indexTablePeriod,
},
},
})
},
)
defer stopFunc()
require.Equal(t, 6, len(tableManager.tables))
require.Equal(t, 5, len(tableManager.tables))
tables = []string{
buildTableName(0),
buildTableName(1),
buildTableName(4),
buildTableName(5),
buildTableName(6),
buildTableName(7),

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
@ -21,7 +22,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/uploads"
util_log "github.com/grafana/loki/pkg/util/log"
)
type Mode string
@ -134,6 +134,7 @@ type indexShipper struct {
uploadsManager uploads.TableManager
downloadsManager downloads.TableManager
logger log.Logger
stopOnce sync.Once
}
@ -144,7 +145,7 @@ type indexShipper struct {
// it accepts ranges of table numbers(config.TableRanges) to be managed by the shipper.
// This is mostly useful on the read path to sync and manage specific index tables within the given table number ranges.
func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, open index.OpenIndexFileFunc, tableRangesToHandle config.TableRanges, reg prometheus.Registerer) (IndexShipper, error) {
ownsTenantFn downloads.IndexGatewayOwnsTenant, open index.OpenIndexFileFunc, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (IndexShipper, error) {
switch cfg.Mode {
case ModeReadOnly, ModeWriteOnly, ModeReadWrite:
default:
@ -153,20 +154,21 @@ func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downl
shipper := indexShipper{
cfg: cfg,
openIndexFileFunc: open,
logger: logger,
}
err := shipper.init(storageClient, limits, ownsTenantFn, tableRangesToHandle, reg)
err := shipper.init(storageClient, limits, ownsTenantFn, tableRangeToHandle, reg)
if err != nil {
return nil, err
}
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("starting index shipper in %s mode", cfg.Mode))
level.Info(shipper.logger).Log("msg", fmt.Sprintf("starting index shipper in %s mode", cfg.Mode))
return &shipper, nil
}
func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRangesToHandle config.TableRanges, reg prometheus.Registerer) error {
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRangeToHandle config.TableRange, reg prometheus.Registerer) error {
indexStorageClient := storage.NewIndexStorageClient(storageClient, s.cfg.SharedStoreKeyPrefix)
if s.cfg.Mode != ModeReadOnly {
@ -174,7 +176,7 @@ func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads.
UploadInterval: UploadInterval,
DBRetainPeriod: s.cfg.IngesterDBRetainPeriod,
}
uploadsManager, err := uploads.NewTableManager(cfg, indexStorageClient, reg)
uploadsManager, err := uploads.NewTableManager(cfg, indexStorageClient, reg, s.logger)
if err != nil {
return err
}
@ -190,7 +192,7 @@ func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads.
QueryReadyNumDays: s.cfg.QueryReadyNumDays,
Limits: limits,
}
downloadsManager, err := downloads.NewTableManager(cfg, s.openIndexFileFunc, indexStorageClient, ownsTenantFn, tableRangesToHandle, reg)
downloadsManager, err := downloads.NewTableManager(cfg, s.openIndexFileFunc, indexStorageClient, ownsTenantFn, tableRangeToHandle, reg, s.logger)
if err != nil {
return err
}
@ -254,3 +256,14 @@ func (s *indexShipper) stop() {
s.downloadsManager.Stop()
}
}
type Noop struct{}
func (Noop) AddIndex(tableName, userID string, index index.Index) error { return nil }
func (Noop) ForEach(ctx context.Context, tableName, userID string, callback index.ForEachIndexCallback) error {
return nil
}
func (Noop) ForEachConcurrent(ctx context.Context, tableName, userID string, callback index.ForEachIndexCallback) error {
return nil
}
func (Noop) Stop() {}

@ -5,12 +5,12 @@ import (
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)
type Config struct {
@ -31,19 +31,21 @@ type tableManager struct {
tables map[string]Table
tablesMtx sync.RWMutex
metrics *metrics
logger log.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewTableManager(cfg Config, storageClient storage.Client, reg prometheus.Registerer) (TableManager, error) {
func NewTableManager(cfg Config, storageClient storage.Client, reg prometheus.Registerer, logger log.Logger) (TableManager, error) {
ctx, cancel := context.WithCancel(context.Background())
tm := tableManager{
cfg: cfg,
storageClient: storageClient,
tables: map[string]Table{},
metrics: newMetrics(reg),
logger: logger,
ctx: ctx,
cancel: cancel,
}
@ -72,7 +74,7 @@ func (tm *tableManager) loop() {
}
func (tm *tableManager) Stop() {
level.Info(util_log.Logger).Log("msg", "stopping table manager")
level.Info(tm.logger).Log("msg", "stopping table manager")
tm.cancel()
tm.wg.Wait()
@ -131,14 +133,14 @@ func (tm *tableManager) uploadTables(ctx context.Context) {
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()
level.Info(util_log.Logger).Log("msg", "uploading tables")
level.Info(tm.logger).Log("msg", "uploading tables")
status := statusSuccess
for _, table := range tm.tables {
err := table.Upload(ctx)
if err != nil {
status = statusFailure
level.Error(util_log.Logger).Log("msg", "failed to upload table", "table", table.Name(), "err", err)
level.Error(tm.logger).Log("msg", "failed to upload table", "table", table.Name(), "err", err)
continue
}
@ -146,7 +148,7 @@ func (tm *tableManager) uploadTables(ctx context.Context) {
err = table.Cleanup(tm.cfg.DBRetainPeriod)
if err != nil {
// we do not want to stop uploading of dbs due to failures in cleaning them up so logging just the error here.
level.Error(util_log.Logger).Log("msg", "failed to cleanup uploaded index past their retention period", "table", table.Name(), "err", err)
level.Error(tm.logger).Log("msg", "failed to cleanup uploaded index past their retention period", "table", table.Name(), "err", err)
}
}

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
@ -32,7 +33,7 @@ func buildTestTableManager(t *testing.T, testDir string) (TableManager, stopFunc
cfg := Config{
UploadInterval: time.Hour,
}
tm, err := NewTableManager(cfg, storageClient, nil)
tm, err := NewTableManager(cfg, storageClient, nil, log.NewNopLogger())
require.NoError(t, err)
return tm, func() {

@ -2,7 +2,6 @@ package index
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@ -10,16 +9,18 @@ import (
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/bbolt"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/series/index"
util_log "github.com/grafana/loki/pkg/util/log"
)
type Config struct {
@ -33,9 +34,11 @@ type TableManager struct {
cfg Config
indexShipper Shipper
metrics *metrics
tables map[string]*Table
tablesMtx sync.RWMutex
metrics *metrics
logger log.Logger
tables map[string]*Table
tableRange config.TableRange
tablesMtx sync.RWMutex
ctx context.Context
cancel context.CancelFunc
@ -47,7 +50,7 @@ type Shipper interface {
ForEach(ctx context.Context, tableName, userID string, callback shipper_index.ForEachIndexCallback) error
}
func NewTableManager(cfg Config, indexShipper Shipper, registerer prometheus.Registerer) (*TableManager, error) {
func NewTableManager(cfg Config, indexShipper Shipper, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (*TableManager, error) {
err := chunk_util.EnsureDirectory(cfg.IndexDir)
if err != nil {
return nil, err
@ -58,8 +61,10 @@ func NewTableManager(cfg Config, indexShipper Shipper, registerer prometheus.Reg
cfg: cfg,
indexShipper: indexShipper,
metrics: newMetrics(registerer),
tableRange: tableRange,
ctx: ctx,
cancel: cancel,
logger: logger,
}
tables, err := tm.loadTables()
@ -92,7 +97,7 @@ func (tm *TableManager) loop() {
}
func (tm *TableManager) Stop() {
level.Info(util_log.Logger).Log("msg", "stopping table manager")
level.Info(tm.logger).Log("msg", "stopping table manager")
tm.cancel()
tm.wg.Wait()
@ -163,20 +168,20 @@ func (tm *TableManager) handoverIndexesToShipper(force bool) {
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()
level.Info(util_log.Logger).Log("msg", "handing over indexes to shipper")
level.Info(tm.logger).Log("msg", "handing over indexes to shipper")
for _, table := range tm.tables {
err := table.HandoverIndexesToShipper(force)
if err != nil {
// continue handing over other tables while skipping cleanup for a failed one.
level.Error(util_log.Logger).Log("msg", "failed to handover index", "table", table.name, "err", err)
level.Error(tm.logger).Log("msg", "failed to handover index", "table", table.name, "err", err)
continue
}
err = table.Snapshot()
if err != nil {
// we do not want to stop handing over of index due to failures in snapshotting them so logging just the error here.
level.Error(util_log.Logger).Log("msg", "failed to snapshot table for reads", "table", table.name, "err", err)
level.Error(tm.logger).Log("msg", "failed to snapshot table for reads", "table", table.name, "err", err)
}
}
}
@ -199,10 +204,20 @@ func (tm *TableManager) loadTables() (map[string]*Table, error) {
continue
}
if ok, err := tm.tableRange.TableInRange(entry.Name()); !ok {
if err != nil {
level.Error(tm.logger).Log("msg", "failed to load table", "table-name", entry.Name(), "err", err)
} else {
level.Debug(tm.logger).Log("msg", "skip loading table as it is not in range", "table-name", entry.Name())
}
continue
}
// since we are moving to keeping files for same table in a folder, if current element is a file we need to move it inside a directory with the same name
// i.e file index_123 would be moved to path index_123/index_123.
if !entry.IsDir() {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("found a legacy file %s, moving it to folder with same name", entry.Name()))
level.Info(tm.logger).Log("msg", fmt.Sprintf("found a legacy file %s, moving it to folder with same name", entry.Name()))
filePath := filepath.Join(tm.cfg.IndexDir, entry.Name())
// create a folder with .temp suffix since we can't create a directory with same name as file.
@ -222,7 +237,7 @@ func (tm *TableManager) loadTables() (map[string]*Table, error) {
}
}
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("loading table %s", entry.Name()))
level.Info(tm.logger).Log("msg", fmt.Sprintf("loading table %s", entry.Name()))
table, err := LoadTable(filepath.Join(tm.cfg.IndexDir, entry.Name()), tm.cfg.Uploader, tm.indexShipper, tm.cfg.MakePerTenantBuckets, tm.metrics)
if err != nil {
return nil, err
@ -232,7 +247,7 @@ func (tm *TableManager) loadTables() (map[string]*Table, error) {
// if table is nil it means it has no files in it so remove the folder for that table.
err := os.Remove(filepath.Join(tm.cfg.IndexDir, entry.Name()))
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to remove empty table folder", "table", entry.Name(), "err", err)
level.Error(tm.logger).Log("msg", "failed to remove empty table folder", "table", entry.Name(), "err", err)
}
continue
}

@ -2,21 +2,27 @@ package index
import (
"context"
"math"
"os"
"path/filepath"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
index_shipper "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/index/indexfile"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
)
const indexTablePeriod = 24 * time.Hour
func buildTestTableManager(t *testing.T, testDir string) (*TableManager, stopFunc) {
defer func() {
require.NoError(t, os.RemoveAll(testDir))
@ -29,7 +35,15 @@ func buildTestTableManager(t *testing.T, testDir string) (*TableManager, stopFun
Uploader: "test-table-manager",
IndexDir: indexPath,
}
tm, err := NewTableManager(cfg, mockIndexShipper, nil)
tableRange := config.TableRange{
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: indexTablePeriod,
}},
}
tm, err := NewTableManager(cfg, mockIndexShipper, tableRange, nil, log.NewNopLogger())
require.NoError(t, err)
return tm, tm.Stop
@ -42,11 +56,12 @@ func TestLoadTables(t *testing.T) {
indexPath := filepath.Join(testDir, indexDirName)
require.NoError(t, util.EnsureDirectory(indexPath))
// add a legacy db which is outside of table specific folder
testutil.AddRecordsToDB(t, filepath.Join(indexPath, "table0"), 0, 10, nil)
// add a legacy db which is outside of table specific folder
testutil.AddRecordsToDB(t, filepath.Join(indexPath, "table1"), 0, 10, nil)
// table1 with 2 dbs
testutil.SetupDBsAtPath(t, filepath.Join(indexPath, "table1"), map[string]testutil.DBConfig{
// table2 with 2 dbs
testutil.SetupDBsAtPath(t, filepath.Join(indexPath, "table2"), map[string]testutil.DBConfig{
"db1": {
DBRecords: testutil.DBRecords{
Start: 10,
@ -61,8 +76,8 @@ func TestLoadTables(t *testing.T) {
},
}, nil)
// table2 with 2 dbs
testutil.SetupDBsAtPath(t, filepath.Join(indexPath, "table2"), map[string]testutil.DBConfig{
// table3 with 2 dbs
testutil.SetupDBsAtPath(t, filepath.Join(indexPath, "table3"), map[string]testutil.DBConfig{
"db1": {
DBRecords: testutil.DBRecords{
Start: 30,
@ -77,12 +92,28 @@ func TestLoadTables(t *testing.T) {
},
}, nil)
// table4 with 2 dbs
testutil.SetupDBsAtPath(t, filepath.Join(indexPath, "table4"), map[string]testutil.DBConfig{
"db1": {
DBRecords: testutil.DBRecords{
Start: 50,
NumRecords: 10,
},
},
"db2": {
DBRecords: testutil.DBRecords{
Start: 60,
NumRecords: 10,
},
},
}, nil)
expectedTables := map[string]struct {
start, numRecords int
}{
"table0": {start: 0, numRecords: 10},
"table1": {start: 10, numRecords: 20},
"table2": {start: 30, numRecords: 20},
"table1": {start: 0, numRecords: 10},
"table2": {start: 10, numRecords: 20},
"table3": {start: 30, numRecords: 20},
}
cfg := Config{
@ -90,13 +121,21 @@ func TestLoadTables(t *testing.T) {
IndexDir: indexPath,
}
tm, err := NewTableManager(cfg, mockIndexShipper, nil)
tableRange := config.TableRange{
Start: 1,
End: 3,
PeriodConfig: &config.PeriodConfig{IndexTables: config.PeriodicTableConfig{
Prefix: "table",
Period: indexTablePeriod,
}},
}
tm, err := NewTableManager(cfg, mockIndexShipper, tableRange, nil, log.NewNopLogger())
require.NoError(t, err)
defer tm.Stop()
require.Len(t, tm.tables, len(expectedTables))
stat, err := os.Stat(filepath.Join(indexPath, "table0", "table0"))
stat, err := os.Stat(filepath.Join(indexPath, "table1", "table1"))
require.NoError(t, err)
require.True(t, !stat.IsDir())

@ -3,9 +3,11 @@ package indexgateway
import (
"context"
"fmt"
"sort"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
@ -17,9 +19,11 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/util/spanlogger"
)
const (
@ -40,11 +44,16 @@ type IndexClient interface {
Stop()
}
type IndexClientWithRange struct {
IndexClient
TableRange config.TableRange
}
type Gateway struct {
services.Service
indexQuerier IndexQuerier
indexClient IndexClient
indexClients []IndexClientWithRange
cfg Config
log log.Logger
@ -56,20 +65,24 @@ type Gateway struct {
//
// In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started.
// Otherwise, it starts an Idle Service that doesn't have lifecycle hooks.
func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier, indexClient IndexClient) (*Gateway, error) {
if indexClient == nil {
indexClient = failingIndexClient{}
}
func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange) (*Gateway, error) {
g := &Gateway{
indexQuerier: indexQuerier,
cfg: cfg,
log: log,
indexClient: indexClient,
indexClients: indexClients,
}
// query newer periods first
sort.Slice(g.indexClients, func(i, j int) bool {
return g.indexClients[i].TableRange.Start > g.indexClients[j].TableRange.Start
})
g.Service = services.NewIdleService(nil, func(failureCase error) error {
g.indexQuerier.Stop()
g.indexClient.Stop()
for _, indexClient := range g.indexClients {
indexClient.Stop()
}
return nil
})
@ -77,11 +90,18 @@ func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registere
}
func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, server logproto.IndexGateway_QueryIndexServer) error {
var outerErr error
var innerErr error
log, _ := spanlogger.New(context.Background(), "IndexGateway.QueryIndex")
defer log.Finish()
var outerErr, innerErr error
queries := make([]index.Query, 0, len(request.Queries))
for _, query := range request.Queries {
if _, err := config.ExtractTableNumberFromName(query.TableName); err != nil {
level.Error(log).Log("msg", "skip querying table", "table", query.TableName, "err", err)
continue
}
queries = append(queries, index.Query{
TableName: query.TableName,
HashValue: query.HashValue,
@ -91,28 +111,53 @@ func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, server logprot
})
}
sort.Slice(queries, func(i, j int) bool {
ta, _ := config.ExtractTableNumberFromName(queries[i].TableName)
tb, _ := config.ExtractTableNumberFromName(queries[j].TableName)
return ta < tb
})
sendBatchMtx := sync.Mutex{}
outerErr = g.indexClient.QueryPages(server.Context(), queries, func(query index.Query, batch index.ReadBatchResult) bool {
innerErr = buildResponses(query, batch, func(response *logproto.QueryIndexResponse) error {
// do not send grpc responses concurrently. See https://github.com/grpc/grpc-go/blob/master/stream.go#L120-L123.
sendBatchMtx.Lock()
defer sendBatchMtx.Unlock()
for _, indexClient := range g.indexClients {
// find queries that can be handled by this index client.
start := sort.Search(len(queries), func(i int) bool {
tableNumber, _ := config.ExtractTableNumberFromName(queries[i].TableName)
return tableNumber >= indexClient.TableRange.Start
})
end := sort.Search(len(queries), func(j int) bool {
tableNumber, _ := config.ExtractTableNumberFromName(queries[j].TableName)
return tableNumber > indexClient.TableRange.End
})
if end-start <= 0 {
continue
}
outerErr = indexClient.QueryPages(server.Context(), queries[start:end], func(query index.Query, batch index.ReadBatchResult) bool {
innerErr = buildResponses(query, batch, func(response *logproto.QueryIndexResponse) error {
// do not send grpc responses concurrently. See https://github.com/grpc/grpc-go/blob/master/stream.go#L120-L123.
sendBatchMtx.Lock()
defer sendBatchMtx.Unlock()
return server.Send(response)
return server.Send(response)
})
if innerErr != nil {
return false
}
return true
})
if innerErr != nil {
return false
return innerErr
}
return true
})
if innerErr != nil {
return innerErr
if outerErr != nil {
return outerErr
}
}
return outerErr
return nil
}
func buildResponses(query index.Query, batch index.ReadBatchResult, callback func(*logproto.QueryIndexResponse) error) error {

@ -3,14 +3,17 @@ package indexgateway
import (
"context"
"fmt"
"math"
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
util_math "github.com/grafana/loki/pkg/util/math"
)
@ -71,11 +74,13 @@ func (m *mockQueryIndexServer) Context() context.Context {
type mockIndexClient struct {
index.Client
response *mockBatch
response *mockBatch
tablesQueried []string
}
func (m mockIndexClient) QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error {
func (m *mockIndexClient) QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error {
for _, query := range queries {
m.tablesQueried = append(m.tablesQueried, query.TableName)
callback(query, m.response)
}
@ -127,7 +132,16 @@ func TestGateway_QueryIndex(t *testing.T) {
})
}
expectedQueryKey = util.QueryKey(query)
gateway.indexClient = mockIndexClient{response: &mockBatch{size: responseSize}}
gateway.indexClients = []IndexClientWithRange{{
IndexClient: &mockIndexClient{response: &mockBatch{size: responseSize}},
TableRange: config.TableRange{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: tableNamePrefix},
},
},
}}
err := gateway.QueryIndex(&logproto.QueryIndexRequest{Queries: []*logproto.IndexQuery{{
TableName: query.TableName,
@ -142,3 +156,85 @@ func TestGateway_QueryIndex(t *testing.T) {
require.Len(t, expectedRanges, 0)
}
}
func TestGateway_QueryIndex_multistore(t *testing.T) {
var (
responseSize = 99
expectedQueries []*logproto.IndexQuery
queries []*logproto.IndexQuery
)
var server logproto.IndexGateway_QueryIndexServer = &mockQueryIndexServer{
callback: func(resp *logproto.QueryIndexResponse) {
require.True(t, len(expectedQueries) > 0)
require.Equal(t, util.QueryKey(index.Query{
TableName: expectedQueries[0].TableName,
HashValue: expectedQueries[0].HashValue,
RangeValuePrefix: expectedQueries[0].RangeValuePrefix,
RangeValueStart: expectedQueries[0].RangeValueStart,
ValueEqual: expectedQueries[0].ValueEqual,
}), resp.QueryKey)
require.Len(t, resp.Rows, responseSize)
expectedQueries = expectedQueries[1:]
},
}
// builds queries for the listed tables
for _, i := range []int{6, 10, 12, 16, 99} {
queries = append(queries, &logproto.IndexQuery{
TableName: fmt.Sprintf("%s%d", tableNamePrefix, i),
HashValue: fmt.Sprintf("%s%d", hashValuePrefix, i),
RangeValuePrefix: []byte(fmt.Sprintf("%s%d", rangeValuePrefixPrefix, i)),
RangeValueStart: []byte(fmt.Sprintf("%s%d", rangeValueStartPrefix, i)),
ValueEqual: []byte(fmt.Sprintf("%s%d", valueEqualPrefix, i)),
})
}
indexClients := []IndexClientWithRange{{
IndexClient: &mockIndexClient{response: &mockBatch{size: responseSize}},
// no matching queries for this range
TableRange: config.TableRange{
Start: 0,
End: 4,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: tableNamePrefix},
},
},
}, {
IndexClient: &mockIndexClient{response: &mockBatch{size: responseSize}},
TableRange: config.TableRange{
Start: 5,
End: 10,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: tableNamePrefix},
},
},
}, {
IndexClient: &mockIndexClient{response: &mockBatch{size: responseSize}},
TableRange: config.TableRange{
Start: 15,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Prefix: tableNamePrefix},
},
},
}}
gateway, err := NewIndexGateway(Config{}, util_log.Logger, nil, nil, indexClients)
require.NoError(t, err)
expectedQueries = append(expectedQueries,
queries[3], queries[4], // queries matching table range 15->MaxInt64
queries[0], queries[1], // queries matching table range 5->10
)
err = gateway.QueryIndex(&logproto.QueryIndexRequest{Queries: queries}, server)
require.NoError(t, err)
// since indexClients are sorted, 0 index would contain the latest period
require.ElementsMatch(t, gateway.indexClients[0].IndexClient.(*mockIndexClient).tablesQueried, []string{"table-name16", "table-name99"})
require.ElementsMatch(t, gateway.indexClients[1].IndexClient.(*mockIndexClient).tablesQueried, []string{"table-name6", "table-name10"})
require.ElementsMatch(t, gateway.indexClients[2].IndexClient.(*mockIndexClient).tablesQueried, []string{})
require.Len(t, expectedQueries, 0)
}

@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
@ -19,7 +20,6 @@ import (
series_index "github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/index/indexfile"
util_log "github.com/grafana/loki/pkg/util/log"
)
type Config struct {
@ -55,32 +55,34 @@ type indexClient struct {
querier index.Querier
metrics *metrics
logger log.Logger
stopOnce sync.Once
}
// NewShipper creates a shipper for syncing local objects with a store
func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRanges config.TableRanges, registerer prometheus.Registerer) (series_index.Client, error) {
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (series_index.Client, error) {
i := indexClient{
cfg: cfg,
metrics: newMetrics(registerer),
logger: logger,
}
err := i.init(storageClient, limits, ownsTenantFn, tableRanges, registerer)
err := i.init(storageClient, limits, ownsTenantFn, tableRange, registerer)
if err != nil {
return nil, err
}
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %s mode", cfg.Mode))
level.Info(i.logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %s mode", cfg.Mode))
return &i, nil
}
func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRanges config.TableRanges, registerer prometheus.Registerer) error {
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRange config.TableRange, registerer prometheus.Registerer) error {
var err error
i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, ownsTenantFn,
indexfile.OpenIndexFile, tableRanges, prometheus.WrapRegistererWithPrefix("loki_boltdb_shipper_", registerer))
indexfile.OpenIndexFile, tableRange, prometheus.WrapRegistererWithPrefix("loki_boltdb_shipper_", registerer), i.logger)
if err != nil {
return err
}
@ -97,7 +99,7 @@ func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.L
DBRetainPeriod: i.cfg.IngesterDBRetainPeriod,
MakePerTenantBuckets: i.cfg.BuildPerTenantIndex,
}
i.writer, err = index.NewTableManager(cfg, i.indexShipper, registerer)
i.writer, err = index.NewTableManager(cfg, i.indexShipper, tableRange, registerer, i.logger)
if err != nil {
return err
}

@ -87,6 +87,7 @@ tsdb/
*/
type HeadManager struct {
name string
log log.Logger
dir string
metrics *Metrics
@ -110,9 +111,10 @@ type HeadManager struct {
cancel chan struct{}
}
func NewHeadManager(logger log.Logger, dir string, metrics *Metrics, tsdbManager TSDBManager) *HeadManager {
func NewHeadManager(name string, logger log.Logger, dir string, metrics *Metrics, tsdbManager TSDBManager) *HeadManager {
shards := defaultHeadManagerStripeSize
m := &HeadManager{
name: name,
log: log.With(logger, "component", "tsdb-head-manager"),
dir: dir,
metrics: metrics,
@ -252,23 +254,50 @@ func (m *HeadManager) Start() error {
return errors.Wrap(err, "removing tsdb scratch dir")
}
for _, d := range managerRequiredDirs(m.dir) {
for _, d := range managerRequiredDirs(m.name, m.dir) {
if err := util.EnsureDirectory(d); err != nil {
return errors.Wrapf(err, "ensuring required directory exists: %s", d)
}
}
walsByPeriod, err := walsByPeriod(m.dir, m.period)
if err != nil {
return err
// build tsdb from legacy WALs in the common wal dir, these would've been generated before the TSDB multi-store support was added.
if err := m.buildTSDBFromWALs(true); err != nil {
return errors.Wrap(err, "building tsdb from legacy WAL files")
}
level.Info(m.log).Log("msg", "loaded wals by period", "groups", len(walsByPeriod))
// Load the shipper with any previously built TSDBs
if err := m.tsdbManager.Start(); err != nil {
return errors.Wrap(err, "failed to start tsdb manager")
}
// build tsdb from store specific WAL files
if err := m.buildTSDBFromWALs(false); err != nil {
return errors.Wrap(err, "building tsdb from old WAL files")
}
err := m.Rotate(time.Now())
if err != nil {
return errors.Wrap(err, "rotating tsdb head")
}
m.wg.Add(1)
go m.loop()
return nil
}
func (m *HeadManager) buildTSDBFromWALs(legacy bool) error {
walDir := managerWalDir(m.name, m.dir)
if legacy {
walDir = managerLegacyWalDir(m.dir)
}
walsByPeriod, err := walsByPeriod(walDir, m.period)
if err != nil {
return errors.Wrap(err, "loading wals by period")
}
level.Info(m.log).Log("msg", "loaded wals by period", "groups", len(walsByPeriod))
// Build any old WALs into a TSDB for the shipper
var allWALs []WALIdentifier
for _, group := range walsByPeriod {
@ -279,31 +308,32 @@ func (m *HeadManager) Start() error {
if err := m.tsdbManager.BuildFromWALs(
now,
allWALs,
legacy,
); err != nil {
return errors.Wrap(err, "building tsdb")
return errors.Wrap(err, "building tsdb from WALs")
}
if err := os.RemoveAll(managerWalDir(m.dir)); err != nil {
m.metrics.walTruncations.WithLabelValues(statusFailure).Inc()
return errors.New("cleaning (removing) wal dir")
}
m.metrics.walTruncations.WithLabelValues(statusSuccess).Inc()
err = m.Rotate(now)
if err != nil {
return errors.Wrap(err, "rotating tsdb head")
if legacy {
for _, grp := range walsByPeriod {
if err := m.removeLegacyWALGroup(grp); err != nil {
return errors.Wrapf(err, "removing legacy TSDB WALs for period %d", grp.period)
}
}
} else {
if err := os.RemoveAll(managerWalDir(m.name, m.dir)); err != nil {
m.metrics.walTruncations.WithLabelValues(statusFailure).Inc()
return errors.Wrap(err, "cleaning (removing) wal dir")
}
m.metrics.walTruncations.WithLabelValues(statusSuccess).Inc()
}
m.wg.Add(1)
go m.loop()
return nil
}
func managerRequiredDirs(parent string) []string {
func managerRequiredDirs(name, parent string) []string {
return []string{
managerScratchDir(parent),
managerWalDir(parent),
managerWalDir(name, parent),
managerMultitenantDir(parent),
managerPerTenantDir(parent),
}
@ -312,7 +342,11 @@ func managerScratchDir(parent string) string {
return filepath.Join(parent, "scratch")
}
func managerWalDir(parent string) string {
func managerWalDir(name, parent string) string {
return filepath.Join(parent, "wal", name)
}
func managerLegacyWalDir(parent string) string {
return filepath.Join(parent, "wal")
}
@ -326,7 +360,7 @@ func managerPerTenantDir(parent string) string {
func (m *HeadManager) Rotate(t time.Time) (err error) {
// create new wal
nextWALPath := walPath(m.dir, t)
nextWALPath := walPath(m.name, m.dir, t)
nextWAL, err := newHeadWAL(m.log, nextWALPath, t)
if err != nil {
return errors.Wrapf(err, "creating tsdb wal: %s during rotation", nextWALPath)
@ -385,7 +419,7 @@ func (m *HeadManager) truncateWALForPeriod(period int) (err error) {
m.metrics.walTruncations.WithLabelValues(status).Inc()
}()
grp, _, err := walsForPeriod(m.dir, m.period, period)
grp, _, err := walsForPeriod(managerWalDir(m.name, m.dir), m.period, period)
if err != nil {
return errors.Wrap(err, "listing wals")
}
@ -405,7 +439,6 @@ type WalGroup struct {
}
func walsByPeriod(dir string, period period) ([]WalGroup, error) {
groupsMap, err := walGroups(dir, period)
if err != nil {
return nil, err
@ -422,7 +455,7 @@ func walsByPeriod(dir string, period period) ([]WalGroup, error) {
}
func walGroups(dir string, period period) (map[int]*WalGroup, error) {
files, err := os.ReadDir(managerWalDir(dir))
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
@ -468,27 +501,48 @@ func walsForPeriod(dir string, period period, offset int) (WalGroup, bool, error
func (m *HeadManager) removeWALGroup(grp WalGroup) error {
for _, wal := range grp.wals {
if err := os.RemoveAll(walPath(m.dir, wal.ts)); err != nil {
return errors.Wrapf(err, "removing tsdb wal: %s", walPath(m.dir, wal.ts))
if err := os.RemoveAll(walPath(m.name, m.dir, wal.ts)); err != nil {
return errors.Wrapf(err, "removing tsdb wal: %s", walPath(m.name, m.dir, wal.ts))
}
}
return nil
}
func walPath(parent string, t time.Time) string {
func (m *HeadManager) removeLegacyWALGroup(grp WalGroup) error {
for _, wal := range grp.wals {
if err := os.RemoveAll(legacyWalPath(m.dir, wal.ts)); err != nil {
return errors.Wrapf(err, "removing tsdb wal: %s", legacyWalPath(m.dir, wal.ts))
}
}
return nil
}
func walPath(name, parent string, t time.Time) string {
return filepath.Join(
managerWalDir(parent),
managerWalDir(name, parent),
fmt.Sprintf("%d", t.Unix()),
)
}
func legacyWalPath(parent string, t time.Time) string {
return filepath.Join(
managerLegacyWalDir(parent),
fmt.Sprintf("%d", t.Unix()),
)
}
// recoverHead recovers from all WALs belonging to some period
// and inserts it into the active *tenantHeads
func recoverHead(dir string, heads *tenantHeads, wals []WALIdentifier) error {
func recoverHead(name, dir string, heads *tenantHeads, wals []WALIdentifier, legacy bool) error {
for _, id := range wals {
// use anonymous function for ease of cleanup
if err := func(id WALIdentifier) error {
reader, closer, err := wal.NewWalReader(walPath(dir, id.ts), -1)
walPath := walPath(name, dir, id.ts)
if legacy {
walPath = legacyWalPath(dir, id.ts)
}
reader, closer, err := wal.NewWalReader(walPath, -1)
if err != nil {
return err
}

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"path/filepath"
"sync"
"testing"
"time"
@ -15,17 +16,26 @@ import (
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"github.com/grafana/loki/pkg/validation"
)
type noopTSDBManager struct {
dir string
name string
dir string
*tenantHeads
}
func newNoopTSDBManager(dir string) noopTSDBManager {
func newNoopTSDBManager(name, dir string) noopTSDBManager {
return noopTSDBManager{
name: name,
dir: dir,
tenantHeads: newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger()),
}
@ -35,11 +45,24 @@ func (m noopTSDBManager) BuildFromHead(_ *tenantHeads) error {
panic("BuildFromHead not implemented")
}
func (m noopTSDBManager) BuildFromWALs(_ time.Time, wals []WALIdentifier) error {
return recoverHead(m.dir, m.tenantHeads, wals)
func (m noopTSDBManager) BuildFromWALs(_ time.Time, wals []WALIdentifier, _ bool) error {
return recoverHead(m.name, m.dir, m.tenantHeads, wals, false)
}
func (m noopTSDBManager) Start() error { return nil }
type zeroValueLimits struct {
}
func (m *zeroValueLimits) AllByUserID() map[string]*validation.Limits {
return nil
}
func (m *zeroValueLimits) DefaultLimits() *validation.Limits {
return &validation.Limits{
QueryReadyIndexNumDays: 0,
}
}
func chunkMetasToChunkRefs(user string, fp uint64, xs index.ChunkMetas) (res []ChunkRef) {
for _, x := range xs {
res = append(res, ChunkRef{
@ -53,6 +76,19 @@ func chunkMetasToChunkRefs(user string, fp uint64, xs index.ChunkMetas) (res []C
return
}
func chunkMetasToLogProtoChunkRefs(user string, fp uint64, xs index.ChunkMetas) (res []logproto.ChunkRef) {
for _, x := range xs {
res = append(res, logproto.ChunkRef{
UserID: user,
Fingerprint: fp,
From: model.TimeFromUnix(x.From().Unix()),
Through: model.TimeFromUnix(x.Through().Unix()),
Checksum: x.Checksum,
})
}
return
}
// Test append
func Test_TenantHeads_Append(t *testing.T) {
h := newTenantHeads(time.Now(), defaultHeadManagerStripeSize, NewMetrics(nil), log.NewNopLogger())
@ -173,10 +209,11 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
},
}
mgr := NewHeadManager(log.NewNopLogger(), dir, NewMetrics(nil), newNoopTSDBManager(dir))
storeName := "store_2010-10-10"
mgr := NewHeadManager(storeName, log.NewNopLogger(), dir, NewMetrics(nil), newNoopTSDBManager(storeName, dir))
// This bit is normally handled by the Start() fn, but we're testing a smaller surface area
// so ensure our dirs exist
for _, d := range managerRequiredDirs(dir) {
for _, d := range managerRequiredDirs(storeName, dir) {
require.Nil(t, util.EnsureDirectory(d))
}
@ -184,7 +221,7 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
require.Nil(t, mgr.Rotate(now))
// now build a WAL independently to test recovery
w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.dir, now), now)
w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.name, mgr.dir, now), now)
require.Nil(t, err)
for i, c := range cases {
@ -204,11 +241,11 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
require.Nil(t, w.Stop())
grp, ok, err := walsForPeriod(mgr.dir, mgr.period, mgr.period.PeriodFor(now))
grp, ok, err := walsForPeriod(managerWalDir(mgr.name, mgr.dir), mgr.period, mgr.period.PeriodFor(now))
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, 1, len(grp.wals))
require.Nil(t, recoverHead(mgr.dir, mgr.activeHeads, grp.wals))
require.Nil(t, recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false))
for _, c := range cases {
refs, err := mgr.GetChunkRefs(
@ -257,8 +294,9 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
},
}
mgr := NewHeadManager(log.NewNopLogger(), dir, NewMetrics(nil), newNoopTSDBManager(dir))
w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.dir, curPeriod), curPeriod)
storeName := "store_2010-10-10"
mgr := NewHeadManager(storeName, log.NewNopLogger(), dir, NewMetrics(nil), newNoopTSDBManager(storeName, dir))
w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.name, mgr.dir, curPeriod), curPeriod)
require.Nil(t, err)
// Write old WALs
@ -336,6 +374,136 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
}
}
func TestBuildLegacyWALs(t *testing.T) {
dir := t.TempDir()
secondStoreDate := parseDate("2023-01-02")
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
{
From: config.DayTime{Time: timeToModelTime(secondStoreDate)},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
},
}
c := struct {
Labels labels.Labels
Fingerprint uint64
Chunks []index.ChunkMeta
User string
}{
User: "tenant1",
Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`),
Fingerprint: mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash(),
Chunks: []index.ChunkMeta{
{
MinTime: secondStoreDate.Add(-36 * time.Hour).UnixMilli(),
MaxTime: secondStoreDate.Add(-24 * time.Hour).UnixMilli(),
Checksum: 3,
},
// chunk overlapping the period boundary
{
MinTime: secondStoreDate.Add(-1 * time.Hour).UnixMilli(),
MaxTime: secondStoreDate.Add(1 * time.Hour).UnixMilli(),
Checksum: 3,
},
{
MinTime: secondStoreDate.Add(24 * time.Hour).UnixMilli(),
MaxTime: secondStoreDate.Add(36 * time.Hour).UnixMilli(),
Checksum: 3,
},
},
}
// populate WAL file with chunks from two different periods
now := time.Now()
w, err := newHeadWAL(log.NewNopLogger(), legacyWalPath(dir, now), now)
require.Nil(t, err)
require.Nil(t, w.Log(&WALRecord{
UserID: c.User,
Fingerprint: c.Fingerprint,
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(123),
Labels: c.Labels,
},
Chks: ChunkMetasRecord{
Chks: c.Chunks,
Ref: uint64(123),
},
}))
require.Nil(t, w.Stop())
fsObjectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: filepath.Join(dir, "fs_store")})
require.Nil(t, err)
shipperCfg := indexshipper.Config{}
flagext.DefaultValues(&shipperCfg)
shipperCfg.Mode = indexshipper.ModeReadWrite
shipperCfg.ActiveIndexDirectory = filepath.Join(dir)
shipperCfg.CacheLocation = filepath.Join(dir, "cache")
for _, tc := range []struct {
name, store string
tableRange config.TableRange
expectedChunks []logproto.ChunkRef
}{
{
name: "query-period-1",
store: "period-1",
tableRange: schemaCfg.Configs[0].GetIndexTableNumberRange(config.DayTime{Time: timeToModelTime(secondStoreDate.Add(-time.Millisecond))}),
expectedChunks: chunkMetasToLogProtoChunkRefs(c.User, c.Labels.Hash(), c.Chunks[:2]),
},
{
name: "query-period-2",
store: "period-2",
tableRange: schemaCfg.Configs[1].GetIndexTableNumberRange(config.DayTime{Time: math.MaxInt64}),
expectedChunks: chunkMetasToLogProtoChunkRefs(c.User, c.Labels.Hash(), c.Chunks[1:]),
},
} {
t.Run(tc.name, func(t *testing.T) {
store, stop, err := NewStore(tc.store, shipperCfg, schemaCfg, nil, fsObjectClient, &zeroValueLimits{}, tc.tableRange, nil, nil, log.NewNopLogger())
require.Nil(t, err)
refs, err := store.GetChunkRefs(
context.Background(),
c.User,
0, timeToModelTime(secondStoreDate.Add(48*time.Hour)),
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
)
require.Nil(t, err)
require.Equal(t, tc.expectedChunks, refs)
stop()
})
}
}
func parseDate(in string) time.Time {
t, err := time.Parse("2006-01-02", in)
if err != nil {
panic(err)
}
return t
}
func timeToModelTime(t time.Time) model.Time {
return model.TimeFromUnixNano(t.UnixNano())
}
func BenchmarkTenantHeads(b *testing.B) {
for _, tc := range []struct {
readers, writers int

@ -35,14 +35,12 @@ func (m mockIndexShipperIndexIterator) ForEach(ctx context.Context, tableName, u
func BenchmarkIndexClient_Stats(b *testing.B) {
tempDir := b.TempDir()
tableRanges := config.TableRanges{
{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Period: config.ObjectStorageIndexRequiredPeriod,
},
tableRange := config.TableRange{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Period: config.ObjectStorageIndexRequiredPeriod,
},
},
}
@ -51,7 +49,7 @@ func BenchmarkIndexClient_Stats(b *testing.B) {
indexStartYesterday := indexStartToday.Add(-config.ObjectStorageIndexRequiredPeriod)
tables := map[string][]*TSDBFile{
tableRanges[0].PeriodConfig.IndexTables.TableFor(indexStartToday): {
tableRange.PeriodConfig.IndexTables.TableFor(indexStartToday): {
BuildIndex(b, tempDir, []LoadableSeries{
{
Labels: mustParseLabels(`{foo="bar"}`),
@ -60,7 +58,7 @@ func BenchmarkIndexClient_Stats(b *testing.B) {
}),
},
tableRanges[0].PeriodConfig.IndexTables.TableFor(indexStartYesterday): {
tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): {
BuildIndex(b, tempDir, []LoadableSeries{
{
Labels: mustParseLabels(`{foo="bar"}`),
@ -70,12 +68,10 @@ func BenchmarkIndexClient_Stats(b *testing.B) {
},
}
idx := newIndexShipperQuerier(mockIndexShipperIndexIterator{tables: tables}, config.TableRanges{
{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{},
},
idx := newIndexShipperQuerier(mockIndexShipperIndexIterator{tables: tables}, config.TableRange{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{},
})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true})
@ -93,14 +89,12 @@ func BenchmarkIndexClient_Stats(b *testing.B) {
func TestIndexClient_Stats(t *testing.T) {
tempDir := t.TempDir()
tableRanges := config.TableRanges{
{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Period: config.ObjectStorageIndexRequiredPeriod,
},
tableRange := config.TableRange{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{
Period: config.ObjectStorageIndexRequiredPeriod,
},
},
}
@ -109,7 +103,7 @@ func TestIndexClient_Stats(t *testing.T) {
indexStartYesterday := indexStartToday.Add(-config.ObjectStorageIndexRequiredPeriod)
tables := map[string][]*TSDBFile{
tableRanges[0].PeriodConfig.IndexTables.TableFor(indexStartToday): {
tableRange.PeriodConfig.IndexTables.TableFor(indexStartToday): {
BuildIndex(t, tempDir, []LoadableSeries{
{
Labels: mustParseLabels(`{foo="bar"}`),
@ -122,7 +116,7 @@ func TestIndexClient_Stats(t *testing.T) {
}),
},
tableRanges[0].PeriodConfig.IndexTables.TableFor(indexStartYesterday): {
tableRange.PeriodConfig.IndexTables.TableFor(indexStartYesterday): {
BuildIndex(t, tempDir, []LoadableSeries{
{
Labels: mustParseLabels(`{foo="bar"}`),
@ -140,12 +134,10 @@ func TestIndexClient_Stats(t *testing.T) {
},
}
idx := newIndexShipperQuerier(mockIndexShipperIndexIterator{tables: tables}, config.TableRanges{
{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{},
},
idx := newIndexShipperQuerier(mockIndexShipperIndexIterator{tables: tables}, config.TableRange{
Start: 0,
End: math.MaxInt64,
PeriodConfig: &config.PeriodConfig{},
})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true})

@ -24,11 +24,11 @@ type indexShipperIterator interface {
type indexShipperQuerier struct {
shipper indexShipperIterator
chunkFilter chunk.RequestChunkFilterer
tableRanges config.TableRanges
tableRange config.TableRange
}
func newIndexShipperQuerier(shipper indexShipperIterator, tableRanges config.TableRanges) Index {
return &indexShipperQuerier{shipper: shipper, tableRanges: tableRanges}
func newIndexShipperQuerier(shipper indexShipperIterator, tableRange config.TableRange) Index {
return &indexShipperQuerier{shipper: shipper, tableRange: tableRange}
}
type indexIterFunc func(func(context.Context, Index) error) error
@ -40,7 +40,7 @@ func (i indexIterFunc) For(ctx context.Context, f func(context.Context, Index) e
func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.Time, user string) (Index, error) {
itr := indexIterFunc(func(f func(context.Context, Index) error) error {
// Ensure we query both per tenant and multitenant TSDBs
idxBuckets := indexBuckets(from, through, i.tableRanges)
idxBuckets := indexBuckets(from, through, []config.TableRange{i.tableRange})
for _, bkt := range idxBuckets {
if err := i.shipper.ForEachConcurrent(ctx, bkt, user, func(multitenant bool, idx shipper_index.Index) error {
impl, ok := idx.(Index)

@ -5,7 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"sync"
"time"
@ -28,7 +27,7 @@ import (
type TSDBManager interface {
Start() error
// Builds a new TSDB file from a set of WALs
BuildFromWALs(time.Time, []WALIdentifier) error
BuildFromWALs(time.Time, []WALIdentifier, bool) error
// Builds a new TSDB file from tenantHeads
BuildFromHead(*tenantHeads) error
}
@ -42,11 +41,13 @@ tsdbManager is used for managing active index and is responsible for:
- Removing old TSDBs which are no longer needed
*/
type tsdbManager struct {
nodeName string // node name
log log.Logger
dir string
metrics *Metrics
tableRanges config.TableRanges
nodeName string // node name
log log.Logger
name string
dir string
metrics *Metrics
tableRange config.TableRange
schemaCfg config.SchemaConfig
sync.RWMutex
@ -54,20 +55,24 @@ type tsdbManager struct {
}
func NewTSDBManager(
name,
nodeName,
dir string,
shipper indexshipper.IndexShipper,
tableRanges config.TableRanges,
tableRange config.TableRange,
schemaCfg config.SchemaConfig,
logger log.Logger,
metrics *Metrics,
) TSDBManager {
return &tsdbManager{
nodeName: nodeName,
log: log.With(logger, "component", "tsdb-manager"),
dir: dir,
metrics: metrics,
tableRanges: tableRanges,
shipper: shipper,
name: name,
nodeName: nodeName,
log: log.With(logger, "component", "tsdb-manager"),
dir: dir,
metrics: metrics,
tableRange: tableRange,
schemaCfg: schemaCfg,
shipper: shipper,
}
}
@ -87,12 +92,6 @@ func (m *tsdbManager) Start() (err error) {
)
}()
// regexp for finding the trailing index bucket number at the end of table name
extractBucketNumberRegex, err := regexp.Compile(`[0-9]+$`)
if err != nil {
return err
}
// load list of multitenant tsdbs
mulitenantDir := managerMultitenantDir(m.dir)
files, err := os.ReadDir(mulitenantDir)
@ -106,12 +105,8 @@ func (m *tsdbManager) Start() (err error) {
}
bucket := f.Name()
if !extractBucketNumberRegex.MatchString(f.Name()) {
level.Warn(m.log).Log(
"msg", "directory name does not match expected bucket name pattern",
"name", bucket,
"err", err.Error(),
)
if ok, err := m.tableRange.TableInRange(bucket); !ok {
level.Info(m.log).Log("msg", fmt.Sprintf("skip loading, table not in range: %s", f.Name()), "reason", err)
continue
}
buckets++
@ -156,7 +151,7 @@ func (m *tsdbManager) Start() (err error) {
return nil
}
func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) {
func (m *tsdbManager) buildFromHead(heads *tenantHeads, shipper indexshipper.IndexShipper, tableRanges []config.TableRange) (err error) {
periods := make(map[string]*Builder)
if err := heads.forAll(func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error {
@ -164,7 +159,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) {
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]index.ChunkMetas)
for _, chk := range chks {
idxBuckets := indexBuckets(chk.From(), chk.Through(), m.tableRanges)
idxBuckets := indexBuckets(chk.From(), chk.Through(), tableRanges)
for _, bucket := range idxBuckets {
pds[bucket] = append(pds[bucket], chk)
@ -215,7 +210,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) {
start := time.Now()
_, err = b.Build(
context.Background(),
managerScratchDir(m.dir),
filepath.Join(managerScratchDir(m.dir), m.name),
func(from, through model.Time, checksum uint32) Identifier {
return dst
},
@ -231,7 +226,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads) (err error) {
return err
}
if err := m.shipper.AddIndex(p, "", loaded); err != nil {
if err := shipper.AddIndex(p, "", loaded); err != nil {
return err
}
}
@ -251,10 +246,10 @@ func (m *tsdbManager) BuildFromHead(heads *tenantHeads) (err error) {
m.metrics.tsdbBuilds.WithLabelValues(status, "head").Inc()
}()
return m.buildFromHead(heads)
return m.buildFromHead(heads, m.shipper, []config.TableRange{m.tableRange})
}
func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) {
func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier, legacy bool) (err error) {
level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t)
defer func() {
status := statusSuccess
@ -265,14 +260,28 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error
m.metrics.tsdbBuilds.WithLabelValues(status, "wal").Inc()
}()
var (
tableRanges = []config.TableRange{m.tableRange}
shipper = m.shipper
)
if legacy {
// pass all TSDB tableRanges as the legacy WAL files are not period specific.
tableRanges = config.GetIndexStoreTableRanges(config.TSDBType, m.schemaCfg.Configs)
// do not ship legacy WAL files.
// TSDBs built from these WAL files would get loaded on starting tsdbManager
shipper = indexshipper.Noop{}
}
level.Debug(m.log).Log("msg", "recovering tenant heads")
for _, id := range ids {
tmp := newTenantHeads(id.ts, defaultHeadManagerStripeSize, m.metrics, m.log)
if err = recoverHead(m.dir, tmp, []WALIdentifier{id}); err != nil {
if err = recoverHead(m.name, m.dir, tmp, []WALIdentifier{id}, legacy); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
}
err := m.buildFromHead(tmp)
err := m.buildFromHead(tmp, shipper, tableRanges)
if err != nil {
return err
}

@ -6,6 +6,7 @@ import (
"math"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -20,7 +21,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads"
tsdb_index "github.com/grafana/loki/pkg/storage/stores/tsdb/index"
util_log "github.com/grafana/loki/pkg/util/log"
)
type IndexWriter interface {
@ -32,39 +32,45 @@ type store struct {
indexShipper indexshipper.IndexShipper
indexWriter IndexWriter
backupIndexWriter index.Writer
logger log.Logger
stopOnce sync.Once
}
// NewStore creates a new TSDB store.
// This is meant to be a singleton and should be instantiated only once per storage.Store and reused for all schema configs.
// We do not need to build store for each schema config since we do not do any schema specific handling yet.
// If we do need to do schema specific handling, it would be a good idea to abstract away the handling since
// running multiple head managers would be complicated and wasteful.
// Note: The cmd/migrate tool needs this not to be a true global singleton
// as it will create multiple storage.Store instances in the same process.
func NewStore(indexShipperCfg indexshipper.Config,
p config.PeriodConfig,
// NewStore creates a new tsdb index ReaderWriter.
func NewStore(
name string,
indexShipperCfg indexshipper.Config,
schemaCfg config.SchemaConfig,
f *fetcher.Fetcher,
objectClient client.ObjectClient,
limits downloads.Limits,
tableRanges config.TableRanges,
tableRange config.TableRange,
backupIndexWriter index.Writer,
reg prometheus.Registerer) (index.ReaderWriter, func(), error) {
reg prometheus.Registerer,
logger log.Logger,
) (
index.ReaderWriter,
func(),
error,
) {
if backupIndexWriter == nil {
backupIndexWriter = noopBackupIndexWriter{}
}
storeInstance := &store{
backupIndexWriter: backupIndexWriter,
logger: logger,
}
err := storeInstance.init(indexShipperCfg, objectClient, limits, tableRanges, reg)
if err != nil {
if err := storeInstance.init(name, indexShipperCfg, schemaCfg, objectClient, limits, tableRange, reg); err != nil {
return nil, nil, err
}
return storeInstance, storeInstance.Stop, nil
}
func (s *store) init(indexShipperCfg indexshipper.Config, objectClient client.ObjectClient,
limits downloads.Limits, tableRanges config.TableRanges, reg prometheus.Registerer) error {
func (s *store) init(name string, indexShipperCfg indexshipper.Config, schemaCfg config.SchemaConfig, objectClient client.ObjectClient,
limits downloads.Limits, tableRange config.TableRange, reg prometheus.Registerer) error {
var err error
s.indexShipper, err = indexshipper.NewIndexShipper(
@ -73,8 +79,9 @@ func (s *store) init(indexShipperCfg indexshipper.Config, objectClient client.Ob
limits,
nil,
OpenShippableTSDB,
tableRanges,
tableRange,
prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", reg),
s.logger,
)
if err != nil {
return err
@ -94,7 +101,6 @@ func (s *store) init(indexShipperCfg indexshipper.Config, objectClient client.Ob
}
if indexShipperCfg.Mode != indexshipper.ModeReadOnly {
dir := indexShipperCfg.ActiveIndexDirectory
nodeName, err := indexShipperCfg.GetUniqueUploaderName()
if err != nil {
return err
@ -102,17 +108,20 @@ func (s *store) init(indexShipperCfg indexshipper.Config, objectClient client.Ob
tsdbMetrics := NewMetrics(reg)
tsdbManager := NewTSDBManager(
name,
nodeName,
dir,
indexShipperCfg.ActiveIndexDirectory,
s.indexShipper,
tableRanges,
util_log.Logger,
tableRange,
schemaCfg,
s.logger,
tsdbMetrics,
)
headManager := NewHeadManager(
util_log.Logger,
dir,
name,
s.logger,
indexShipperCfg.ActiveIndexDirectory,
tsdbMetrics,
tsdbManager,
)
@ -126,7 +135,7 @@ func (s *store) init(indexShipperCfg indexshipper.Config, objectClient client.Ob
s.indexWriter = failingIndexWriter{}
}
indices = append(indices, newIndexShipperQuerier(s.indexShipper, tableRanges))
indices = append(indices, newIndexShipperQuerier(s.indexShipper, tableRange))
multiIndex := NewMultiIndex(IndexSlice(indices))
s.Reader = NewIndexClient(multiIndex, opts)
@ -138,7 +147,7 @@ func (s *store) Stop() {
s.stopOnce.Do(func() {
if hm, ok := s.indexWriter.(*HeadManager); ok {
if err := hm.Stop(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to stop head manager", "err", err)
level.Error(s.logger).Log("msg", "failed to stop head manager", "err", err)
}
}
s.indexShipper.Stop()

Loading…
Cancel
Save