Add 3rd target (`backend`) to SSD/Scalable mode (#7650)

**What this PR does / why we need it**:

This adds a 3rd target to SSD/Scalable mode, as well as a config flag to
run the legacy `read` mode from the original 2 target configuration in
order to give people time to migrate before we remove this option in
Loki 3.0 (hopefully).

The 3rd target has the two major advantages:
1. Allows the read path to be run as a deployment and thus auto-scaled
using our existing auto-scaling logic for queriers
2. Creates consistency with Mimir since they went with a 3 target model
for their SSD deployment.
pull/7921/head
Trevor Whitney 3 years ago committed by GitHub
parent c4eb8c87ed
commit f5fbfabd84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 75
      integration/loki_simple_scalable_test.go
  2. 29
      pkg/loki/loki.go
  3. 23
      pkg/loki/modules.go
  4. 181
      pkg/loki/modules_test.go

@ -12,7 +12,7 @@ import (
"github.com/grafana/loki/integration/cluster"
)
func TestSimpleScalableIngestQuery(t *testing.T) {
func TestSimpleScalable_Legacy_IngestQuery(t *testing.T) {
clu := cluster.New()
defer func() {
assert.NoError(t, clu.Cleanup())
@ -74,3 +74,76 @@ func TestSimpleScalableIngestQuery(t *testing.T) {
assert.ElementsMatch(t, []string{"fake"}, resp)
})
}
func TestSimpleScalable_IngestQuery(t *testing.T) {
clu := cluster.New()
defer func() {
assert.NoError(t, clu.Cleanup())
}()
var (
tWrite = clu.AddComponent(
"write",
"-target=write",
)
tBackend = clu.AddComponent(
"backend",
"-target=backend",
"-legacy-read-mode=false",
)
)
require.NoError(t, clu.Run())
tRead := clu.AddComponent(
"read",
"-target=read",
"-common.compactor-address="+tBackend.HTTPURL(),
"-legacy-read-mode=false",
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliWrite := client.New(tenantID, "", tWrite.HTTPURL())
cliWrite.Now = now
cliRead := client.New(tenantID, "", tRead.HTTPURL())
cliRead.Now = now
cliBackend := client.New(tenantID, "", tBackend.HTTPURL())
cliBackend.Now = now
t.Run("ingest logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliWrite.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliWrite.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliWrite.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliWrite.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
t.Run("query", func(t *testing.T) {
resp, err := cliRead.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("label-names", func(t *testing.T) {
resp, err := cliRead.LabelNames(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"job"}, resp)
})
t.Run("label-values", func(t *testing.T) {
resp, err := cliRead.LabelValues(context.Background(), "job")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"fake"}, resp)
})
}

@ -71,6 +71,8 @@ type Config struct {
UseBufferedLogger bool `yaml:"use_buffered_logger"`
UseSyncLogger bool `yaml:"use_sync_logger"`
LegacyReadTarget bool `yaml:"legacy_read_target,omitempty"`
Common common.Config `yaml:"common,omitempty"`
Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
@ -114,6 +116,10 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.UseBufferedLogger, "log.use-buffered", true, "Uses a line-buffered logger to improve performance.")
f.BoolVar(&c.UseSyncLogger, "log.use-sync", true, "Forces all lines logged to hold a mutex to serialize writes.")
//TODO(trevorwhitney): flip this to false with Loki 3.0
f.BoolVar(&c.LegacyReadTarget, "legacy-read-mode", true, "Set to false to disable the legacy read mode and use new scalable mode with 3rd backend target. "+
"The default will be flipped to false in the next Loki release.")
c.registerServerFlagsWithChangedDefaultValues(f)
c.Common.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
@ -229,6 +235,13 @@ func (c *Config) Validate() error {
return err
}
// Honor the legacy scalable deployment topology
if c.LegacyReadTarget {
if c.isModuleEnabled(Backend) {
return fmt.Errorf("invalid target, cannot run backend target with legacy read mode")
}
}
return nil
}
@ -586,6 +599,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
mm.RegisterModule(Write, nil)
mm.RegisterModule(Backend, nil)
// Add dependencies
deps := map[string][]string{
@ -608,28 +622,33 @@ func (t *Loki) setupModuleManager() error {
IngesterQuerier: {Ring},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor, IndexGateway},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway},
MemberlistKV: {Server},
}
// Add IngesterQuerier as a dependency for store when target is either querier, ruler, or read.
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) {
// Add IngesterQuerier as a dependency for store when target is either querier, ruler, read, or backend.
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(Backend) {
deps[Store] = append(deps[Store], IngesterQuerier)
}
// If the query scheduler and querier are running together, make sure the scheduler goes
// first to initialize the ring that will also be used by the querier
if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(All) {
if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) {
deps[Querier] = append(deps[Querier], QueryScheduler)
}
// If the query scheduler and query frontend are running together, make sure the scheduler goes
// first to initialize the ring that will also be used by the query frontend
if (t.Cfg.isModuleEnabled(QueryFrontend) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(All) {
if (t.Cfg.isModuleEnabled(QueryFrontend) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) {
deps[QueryFrontend] = append(deps[QueryFrontend], QueryScheduler)
}
if t.Cfg.LegacyReadTarget {
deps[Read] = append(deps[Read], QueryScheduler, Ruler, Compactor, IndexGateway)
}
if t.Cfg.InternalServer.Enable {
for key, ds := range deps {
idx := -1

@ -99,6 +99,7 @@ const (
All string = "all"
Read string = "read"
Write string = "write"
Backend string = "backend"
UsageReport string = "usage-report"
)
@ -543,7 +544,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.isModuleActive(IndexGateway):
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
@ -592,12 +593,13 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// Only queriers should use the AsyncStore, it should never be used in ingesters.
asyncStore = true
if t.Cfg.isModuleEnabled(Read) {
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
// The legacy Read target includes the index gateway, so disable the index-gateway client in that configuration.
if t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true
}
case t.Cfg.isModuleEnabled(IndexGateway):
// Backend target includes the index gateway
case t.Cfg.isModuleEnabled(IndexGateway), t.Cfg.isModuleEnabled(Backend):
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true
@ -711,7 +713,8 @@ func (t *Loki) supportIndexDeleteRequest() bool {
// compactorAddress returns the configured address of the compactor.
// It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false
func (t *Loki) compactorAddress() (string, bool, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)
if t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
// In single binary or read modes, this module depends on Server
return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil
}
@ -859,7 +862,8 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if (t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read)) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)
if (t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend)) && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util_log.Logger).Log("msg", "Ruler storage is not configured; ruler will not be started.")
return
}
@ -1048,9 +1052,10 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
// IndexGateway runs by default on read target, and should always assume
// IndexGateway runs by default on legacy read and backend targets, and should always assume
// ring mode when run in this way.
if t.isModuleActive(Read) {
legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read)
if legacyReadMode || t.isModuleActive(Backend) {
t.Cfg.IndexGateway.Mode = indexgateway.RingMode
}
@ -1063,7 +1068,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
managerMode := indexgateway.ClientMode
if t.Cfg.isModuleEnabled(IndexGateway) || t.Cfg.isModuleEnabled(Read) {
if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
managerMode = indexgateway.ServerMode
}
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer)

@ -1,6 +1,7 @@
package loki
import (
"fmt"
"path/filepath"
"testing"
"time"
@ -161,33 +162,34 @@ func TestMultiKVSetup(t *testing.T) {
}
}
func TestIndexGatewayRingMode_when_TargetIsRead(t *testing.T) {
func TestIndexGatewayRingMode_when_TargetIsLegacyReadOrBackend(t *testing.T) {
dir := t.TempDir()
t.Run("IndexGateway always set to ring mode when running as part of read target", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, Read)
c, err := New(cfg)
require.NoError(t, err)
services, err := c.ModuleManager.InitModuleServices(Read)
defer func() {
for _, service := range services {
service.StopAsync()
}
}()
require.NoError(t, err)
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode)
})
type ringModeTestCase struct {
name string
transformer func(cfg *Config)
target string
}
t.Run("When IndexGateway is running independent of Read target", func(t *testing.T) {
t.Run("IndexGateway respects configured simple mode", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, IndexGatewayRing)
cfg.IndexGateway.Mode = indexgateway.SimpleMode
for _, tc := range []ringModeTestCase{
{
name: "leagcy read",
target: Read,
},
{
name: "backend",
target: Backend,
transformer: func(cfg *Config) {
cfg.LegacyReadTarget = false
},
},
} {
t.Run(fmt.Sprintf("IndexGateway always set to ring mode when running as part of %s", tc.name), func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, tc.target, tc.transformer)
c, err := New(cfg)
require.NoError(t, err)
services, err := c.ModuleManager.InitModuleServices(IndexGateway)
services, err := c.ModuleManager.InitModuleServices(Read)
defer func() {
for _, service := range services {
service.StopAsync()
@ -195,33 +197,71 @@ func TestIndexGatewayRingMode_when_TargetIsRead(t *testing.T) {
}()
require.NoError(t, err)
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.SimpleMode)
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode)
})
}
t.Run("IndexGateway respects configured ring mode", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, IndexGatewayRing)
cfg.IndexGateway.Mode = indexgateway.RingMode
c, err := New(cfg)
require.NoError(t, err)
services, err := c.ModuleManager.InitModuleServices(IndexGateway)
defer func() {
for _, service := range services {
service.StopAsync()
}
}()
type indexModeTestCase struct {
name string
target string
transformer func(cfg *Config)
}
require.NoError(t, err)
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode)
for _, tc := range []indexModeTestCase{
{
name: "index gateway",
target: IndexGateway,
},
{
name: "new read target",
target: Read,
transformer: func(cfg *Config) {
cfg.LegacyReadTarget = false
},
},
} {
t.Run(fmt.Sprintf("When target is %s", tc.name), func(t *testing.T) {
t.Run("IndexGateway config respects configured simple mode", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, IndexGatewayRing, tc.transformer)
cfg.IndexGateway.Mode = indexgateway.SimpleMode
c, err := New(cfg)
require.NoError(t, err)
services, err := c.ModuleManager.InitModuleServices(IndexGateway)
defer func() {
for _, service := range services {
service.StopAsync()
}
}()
require.NoError(t, err)
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.SimpleMode)
})
t.Run("IndexGateway config respects configured ring mode", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, IndexGatewayRing)
cfg.IndexGateway.Mode = indexgateway.RingMode
c, err := New(cfg)
require.NoError(t, err)
services, err := c.ModuleManager.InitModuleServices(IndexGateway)
defer func() {
for _, service := range services {
service.StopAsync()
}
}()
require.NoError(t, err)
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode)
})
})
})
}
}
func TestIndexGatewayClientConfig_when_TargetIsQuerierOrRead(t *testing.T) {
func TestIndexGatewayClientConfig(t *testing.T) {
dir := t.TempDir()
t.Run("IndexGateway client is disabled when running querier target", func(t *testing.T) {
t.Run("IndexGateway client is enabled when running querier target", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, Querier)
cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType
cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour
@ -240,8 +280,56 @@ func TestIndexGatewayClientConfig_when_TargetIsQuerierOrRead(t *testing.T) {
assert.False(t, c.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled)
})
t.Run("IndexGateway client is endabled when running read target", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, Read)
t.Run("IndexGateway client is disabled when running legacy read target", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, Read, func(cfg *Config) {
cfg.LegacyReadTarget = true
})
cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType
cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour
cfg.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem
cfg.CompactorConfig.WorkingDirectory = dir
c, err := New(cfg)
require.NoError(t, err)
services, err := c.ModuleManager.InitModuleServices(Read)
defer func() {
for _, service := range services {
service.StopAsync()
}
}()
require.NoError(t, err)
assert.True(t, c.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled)
assert.True(t, c.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled)
})
t.Run("IndexGateway client is enabled when running new read target", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, Read, func(cfg *Config) {
cfg.LegacyReadTarget = false
})
cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType
cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour
cfg.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem
cfg.CompactorConfig.WorkingDirectory = dir
c, err := New(cfg)
require.NoError(t, err)
services, err := c.ModuleManager.InitModuleServices(Read)
defer func() {
for _, service := range services {
service.StopAsync()
}
}()
require.NoError(t, err)
assert.False(t, c.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled)
assert.False(t, c.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled)
})
t.Run("IndexGateway client is disabled when running backend target", func(t *testing.T) {
cfg := minimalWorkingConfig(t, dir, Backend, func(cfg *Config) {
cfg.LegacyReadTarget = false
})
cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType
cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour
cfg.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem
@ -264,7 +352,7 @@ func TestIndexGatewayClientConfig_when_TargetIsQuerierOrRead(t *testing.T) {
const localhost = "localhost"
func minimalWorkingConfig(t *testing.T, dir, target string) Config {
func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...func(*Config)) Config {
prepareGlobalMetricsRegistry(t)
cfg := Config{}
@ -315,5 +403,12 @@ func minimalWorkingConfig(t *testing.T, dir, target string) Config {
cfg.Ruler.Config.StoreConfig.Local.Directory = dir
cfg.Common.CompactorAddress = "http://localhost:0"
for _, transformer := range cfgTransformers {
if transformer != nil {
transformer(&cfg)
}
}
return cfg
}

Loading…
Cancel
Save