From f5fbfabd8445b5f027d0da896bfa44c66829d705 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Mon, 12 Dec 2022 14:01:22 -0700 Subject: [PATCH] Add 3rd target (`backend`) to SSD/Scalable mode (#7650) **What this PR does / why we need it**: This adds a 3rd target to SSD/Scalable mode, as well as a config flag to run the legacy `read` mode from the original 2 target configuration in order to give people time to migrate before we remove this option in Loki 3.0 (hopefully). The 3rd target has the two major advantages: 1. Allows the read path to be run as a deployment and thus auto-scaled using our existing auto-scaling logic for queriers 2. Creates consistency with Mimir since they went with a 3 target model for their SSD deployment. --- integration/loki_simple_scalable_test.go | 75 +++++++++- pkg/loki/loki.go | 29 +++- pkg/loki/modules.go | 23 +-- pkg/loki/modules_test.go | 181 +++++++++++++++++------ 4 files changed, 250 insertions(+), 58 deletions(-) diff --git a/integration/loki_simple_scalable_test.go b/integration/loki_simple_scalable_test.go index dc6da63477..47616b163e 100644 --- a/integration/loki_simple_scalable_test.go +++ b/integration/loki_simple_scalable_test.go @@ -12,7 +12,7 @@ import ( "github.com/grafana/loki/integration/cluster" ) -func TestSimpleScalableIngestQuery(t *testing.T) { +func TestSimpleScalable_Legacy_IngestQuery(t *testing.T) { clu := cluster.New() defer func() { assert.NoError(t, clu.Cleanup()) @@ -74,3 +74,76 @@ func TestSimpleScalableIngestQuery(t *testing.T) { assert.ElementsMatch(t, []string{"fake"}, resp) }) } + +func TestSimpleScalable_IngestQuery(t *testing.T) { + clu := cluster.New() + defer func() { + assert.NoError(t, clu.Cleanup()) + }() + + var ( + tWrite = clu.AddComponent( + "write", + "-target=write", + ) + tBackend = clu.AddComponent( + "backend", + "-target=backend", + "-legacy-read-mode=false", + ) + ) + require.NoError(t, clu.Run()) + + tRead := clu.AddComponent( + "read", + "-target=read", + "-common.compactor-address="+tBackend.HTTPURL(), + "-legacy-read-mode=false", + ) + require.NoError(t, clu.Run()) + + tenantID := randStringRunes() + + now := time.Now() + cliWrite := client.New(tenantID, "", tWrite.HTTPURL()) + cliWrite.Now = now + cliRead := client.New(tenantID, "", tRead.HTTPURL()) + cliRead.Now = now + cliBackend := client.New(tenantID, "", tBackend.HTTPURL()) + cliBackend.Now = now + + t.Run("ingest logs", func(t *testing.T) { + // ingest some log lines + require.NoError(t, cliWrite.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"})) + require.NoError(t, cliWrite.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"})) + + require.NoError(t, cliWrite.PushLogLine("lineC", map[string]string{"job": "fake"})) + require.NoError(t, cliWrite.PushLogLine("lineD", map[string]string{"job": "fake"})) + }) + + t.Run("query", func(t *testing.T) { + resp, err := cliRead.RunRangeQuery(context.Background(), `{job="fake"}`) + require.NoError(t, err) + assert.Equal(t, "streams", resp.Data.ResultType) + + var lines []string + for _, stream := range resp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) + }) + + t.Run("label-names", func(t *testing.T) { + resp, err := cliRead.LabelNames(context.Background()) + require.NoError(t, err) + assert.ElementsMatch(t, []string{"job"}, resp) + }) + + t.Run("label-values", func(t *testing.T) { + resp, err := cliRead.LabelValues(context.Background(), "job") + require.NoError(t, err) + assert.ElementsMatch(t, []string{"fake"}, resp) + }) +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 4ac0eedf37..a6ea1095f0 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -71,6 +71,8 @@ type Config struct { UseBufferedLogger bool `yaml:"use_buffered_logger"` UseSyncLogger bool `yaml:"use_sync_logger"` + LegacyReadTarget bool `yaml:"legacy_read_target,omitempty"` + Common common.Config `yaml:"common,omitempty"` Server server.Config `yaml:"server,omitempty"` InternalServer internalserver.Config `yaml:"internal_server,omitempty"` @@ -114,6 +116,10 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&c.UseBufferedLogger, "log.use-buffered", true, "Uses a line-buffered logger to improve performance.") f.BoolVar(&c.UseSyncLogger, "log.use-sync", true, "Forces all lines logged to hold a mutex to serialize writes.") + //TODO(trevorwhitney): flip this to false with Loki 3.0 + f.BoolVar(&c.LegacyReadTarget, "legacy-read-mode", true, "Set to false to disable the legacy read mode and use new scalable mode with 3rd backend target. "+ + "The default will be flipped to false in the next Loki release.") + c.registerServerFlagsWithChangedDefaultValues(f) c.Common.RegisterFlags(f) c.Distributor.RegisterFlags(f) @@ -229,6 +235,13 @@ func (c *Config) Validate() error { return err } + // Honor the legacy scalable deployment topology + if c.LegacyReadTarget { + if c.isModuleEnabled(Backend) { + return fmt.Errorf("invalid target, cannot run backend target with legacy read mode") + } + } + return nil } @@ -586,6 +599,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) mm.RegisterModule(Write, nil) + mm.RegisterModule(Backend, nil) // Add dependencies deps := map[string][]string{ @@ -608,28 +622,33 @@ func (t *Loki) setupModuleManager() error { IngesterQuerier: {Ring}, IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, - Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor, IndexGateway}, + Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor}, + Backend: {QueryScheduler, Ruler, Compactor, IndexGateway}, MemberlistKV: {Server}, } - // Add IngesterQuerier as a dependency for store when target is either querier, ruler, or read. - if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) { + // Add IngesterQuerier as a dependency for store when target is either querier, ruler, read, or backend. + if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(Backend) { deps[Store] = append(deps[Store], IngesterQuerier) } // If the query scheduler and querier are running together, make sure the scheduler goes // first to initialize the ring that will also be used by the querier - if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(All) { + if (t.Cfg.isModuleEnabled(Querier) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) { deps[Querier] = append(deps[Querier], QueryScheduler) } // If the query scheduler and query frontend are running together, make sure the scheduler goes // first to initialize the ring that will also be used by the query frontend - if (t.Cfg.isModuleEnabled(QueryFrontend) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(Read) || t.Cfg.isModuleEnabled(All) { + if (t.Cfg.isModuleEnabled(QueryFrontend) && t.Cfg.isModuleEnabled(QueryScheduler)) || t.Cfg.isModuleEnabled(All) { deps[QueryFrontend] = append(deps[QueryFrontend], QueryScheduler) } + if t.Cfg.LegacyReadTarget { + deps[Read] = append(deps[Read], QueryScheduler, Ruler, Compactor, IndexGateway) + } + if t.Cfg.InternalServer.Enable { for key, ds := range deps { idx := -1 diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 467ec947ce..06fe144cf6 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -99,6 +99,7 @@ const ( All string = "all" Read string = "read" Write string = "write" + Backend string = "backend" UsageReport string = "usage-report" ) @@ -543,7 +544,7 @@ func (t *Loki) initStore() (_ services.Service, err error) { t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval) - case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.isModuleActive(IndexGateway): + case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway): // We do not want query to do any updates to index t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly @@ -592,12 +593,13 @@ func (t *Loki) initStore() (_ services.Service, err error) { // Only queriers should use the AsyncStore, it should never be used in ingesters. asyncStore = true - if t.Cfg.isModuleEnabled(Read) { - // we want to use the actual storage when running the index-gateway, so we remove the Addr from the config + // The legacy Read target includes the index gateway, so disable the index-gateway client in that configuration. + if t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) { t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true } - case t.Cfg.isModuleEnabled(IndexGateway): + // Backend target includes the index gateway + case t.Cfg.isModuleEnabled(IndexGateway), t.Cfg.isModuleEnabled(Backend): // we want to use the actual storage when running the index-gateway, so we remove the Addr from the config t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true @@ -711,7 +713,8 @@ func (t *Loki) supportIndexDeleteRequest() bool { // compactorAddress returns the configured address of the compactor. // It prefers grpc address over http. If the address is grpc then the bool would be true otherwise false func (t *Loki) compactorAddress() (string, bool, error) { - if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) { + legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) + if t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) { // In single binary or read modes, this module depends on Server return fmt.Sprintf("%s:%d", t.Cfg.Server.GRPCListenAddress, t.Cfg.Server.GRPCListenPort), true, nil } @@ -859,7 +862,8 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) { // unfortunately there is no way to generate a "default" config and compare default against actual // to determine if it's unconfigured. the following check, however, correctly tests this. // Single binary integration tests will break if this ever drifts - if (t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read)) && t.Cfg.Ruler.StoreConfig.IsDefaults() { + legacyReadMode := t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read) + if (t.Cfg.isModuleEnabled(All) || legacyReadMode || t.Cfg.isModuleEnabled(Backend)) && t.Cfg.Ruler.StoreConfig.IsDefaults() { level.Info(util_log.Logger).Log("msg", "Ruler storage is not configured; ruler will not be started.") return } @@ -1048,9 +1052,10 @@ func (t *Loki) initIndexGateway() (services.Service, error) { } func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { - // IndexGateway runs by default on read target, and should always assume + // IndexGateway runs by default on legacy read and backend targets, and should always assume // ring mode when run in this way. - if t.isModuleActive(Read) { + legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read) + if legacyReadMode || t.isModuleActive(Backend) { t.Cfg.IndexGateway.Mode = indexgateway.RingMode } @@ -1063,7 +1068,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort managerMode := indexgateway.ClientMode - if t.Cfg.isModuleEnabled(IndexGateway) || t.Cfg.isModuleEnabled(Read) { + if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) { managerMode = indexgateway.ServerMode } rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer) diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index dad7663c34..ddc45a4755 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -1,6 +1,7 @@ package loki import ( + "fmt" "path/filepath" "testing" "time" @@ -161,33 +162,34 @@ func TestMultiKVSetup(t *testing.T) { } } -func TestIndexGatewayRingMode_when_TargetIsRead(t *testing.T) { +func TestIndexGatewayRingMode_when_TargetIsLegacyReadOrBackend(t *testing.T) { dir := t.TempDir() - t.Run("IndexGateway always set to ring mode when running as part of read target", func(t *testing.T) { - cfg := minimalWorkingConfig(t, dir, Read) - c, err := New(cfg) - require.NoError(t, err) - - services, err := c.ModuleManager.InitModuleServices(Read) - defer func() { - for _, service := range services { - service.StopAsync() - } - }() - - require.NoError(t, err) - assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode) - }) + type ringModeTestCase struct { + name string + transformer func(cfg *Config) + target string + } - t.Run("When IndexGateway is running independent of Read target", func(t *testing.T) { - t.Run("IndexGateway respects configured simple mode", func(t *testing.T) { - cfg := minimalWorkingConfig(t, dir, IndexGatewayRing) - cfg.IndexGateway.Mode = indexgateway.SimpleMode + for _, tc := range []ringModeTestCase{ + { + name: "leagcy read", + target: Read, + }, + { + name: "backend", + target: Backend, + transformer: func(cfg *Config) { + cfg.LegacyReadTarget = false + }, + }, + } { + t.Run(fmt.Sprintf("IndexGateway always set to ring mode when running as part of %s", tc.name), func(t *testing.T) { + cfg := minimalWorkingConfig(t, dir, tc.target, tc.transformer) c, err := New(cfg) require.NoError(t, err) - services, err := c.ModuleManager.InitModuleServices(IndexGateway) + services, err := c.ModuleManager.InitModuleServices(Read) defer func() { for _, service := range services { service.StopAsync() @@ -195,33 +197,71 @@ func TestIndexGatewayRingMode_when_TargetIsRead(t *testing.T) { }() require.NoError(t, err) - assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.SimpleMode) + assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode) }) + } - t.Run("IndexGateway respects configured ring mode", func(t *testing.T) { - cfg := minimalWorkingConfig(t, dir, IndexGatewayRing) - cfg.IndexGateway.Mode = indexgateway.RingMode - c, err := New(cfg) - require.NoError(t, err) - - services, err := c.ModuleManager.InitModuleServices(IndexGateway) - defer func() { - for _, service := range services { - service.StopAsync() - } - }() + type indexModeTestCase struct { + name string + target string + transformer func(cfg *Config) + } - require.NoError(t, err) - assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode) + for _, tc := range []indexModeTestCase{ + { + name: "index gateway", + target: IndexGateway, + }, + { + name: "new read target", + target: Read, + transformer: func(cfg *Config) { + cfg.LegacyReadTarget = false + }, + }, + } { + t.Run(fmt.Sprintf("When target is %s", tc.name), func(t *testing.T) { + t.Run("IndexGateway config respects configured simple mode", func(t *testing.T) { + cfg := minimalWorkingConfig(t, dir, IndexGatewayRing, tc.transformer) + cfg.IndexGateway.Mode = indexgateway.SimpleMode + c, err := New(cfg) + require.NoError(t, err) + + services, err := c.ModuleManager.InitModuleServices(IndexGateway) + defer func() { + for _, service := range services { + service.StopAsync() + } + }() + + require.NoError(t, err) + assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.SimpleMode) + }) + + t.Run("IndexGateway config respects configured ring mode", func(t *testing.T) { + cfg := minimalWorkingConfig(t, dir, IndexGatewayRing) + cfg.IndexGateway.Mode = indexgateway.RingMode + c, err := New(cfg) + require.NoError(t, err) + + services, err := c.ModuleManager.InitModuleServices(IndexGateway) + defer func() { + for _, service := range services { + service.StopAsync() + } + }() + + require.NoError(t, err) + assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode) + }) }) - - }) + } } -func TestIndexGatewayClientConfig_when_TargetIsQuerierOrRead(t *testing.T) { +func TestIndexGatewayClientConfig(t *testing.T) { dir := t.TempDir() - t.Run("IndexGateway client is disabled when running querier target", func(t *testing.T) { + t.Run("IndexGateway client is enabled when running querier target", func(t *testing.T) { cfg := minimalWorkingConfig(t, dir, Querier) cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour @@ -240,8 +280,56 @@ func TestIndexGatewayClientConfig_when_TargetIsQuerierOrRead(t *testing.T) { assert.False(t, c.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled) }) - t.Run("IndexGateway client is endabled when running read target", func(t *testing.T) { - cfg := minimalWorkingConfig(t, dir, Read) + t.Run("IndexGateway client is disabled when running legacy read target", func(t *testing.T) { + cfg := minimalWorkingConfig(t, dir, Read, func(cfg *Config) { + cfg.LegacyReadTarget = true + }) + cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType + cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour + cfg.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem + cfg.CompactorConfig.WorkingDirectory = dir + c, err := New(cfg) + require.NoError(t, err) + + services, err := c.ModuleManager.InitModuleServices(Read) + defer func() { + for _, service := range services { + service.StopAsync() + } + }() + + require.NoError(t, err) + assert.True(t, c.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled) + assert.True(t, c.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled) + }) + + t.Run("IndexGateway client is enabled when running new read target", func(t *testing.T) { + cfg := minimalWorkingConfig(t, dir, Read, func(cfg *Config) { + cfg.LegacyReadTarget = false + }) + cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType + cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour + cfg.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem + cfg.CompactorConfig.WorkingDirectory = dir + c, err := New(cfg) + require.NoError(t, err) + + services, err := c.ModuleManager.InitModuleServices(Read) + defer func() { + for _, service := range services { + service.StopAsync() + } + }() + + require.NoError(t, err) + assert.False(t, c.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled) + assert.False(t, c.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled) + }) + + t.Run("IndexGateway client is disabled when running backend target", func(t *testing.T) { + cfg := minimalWorkingConfig(t, dir, Backend, func(cfg *Config) { + cfg.LegacyReadTarget = false + }) cfg.SchemaConfig.Configs[0].IndexType = config.BoltDBShipperType cfg.SchemaConfig.Configs[0].IndexTables.Period = 24 * time.Hour cfg.CompactorConfig.SharedStoreType = config.StorageTypeFileSystem @@ -264,7 +352,7 @@ func TestIndexGatewayClientConfig_when_TargetIsQuerierOrRead(t *testing.T) { const localhost = "localhost" -func minimalWorkingConfig(t *testing.T, dir, target string) Config { +func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...func(*Config)) Config { prepareGlobalMetricsRegistry(t) cfg := Config{} @@ -315,5 +403,12 @@ func minimalWorkingConfig(t *testing.T, dir, target string) Config { cfg.Ruler.Config.StoreConfig.Local.Directory = dir cfg.Common.CompactorAddress = "http://localhost:0" + + for _, transformer := range cfgTransformers { + if transformer != nil { + transformer(&cfg) + } + } + return cfg }