From e0a7b28a61e1ef83d62493c52ad1d2a51ba76902 Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Fri, 4 Nov 2022 12:45:42 +0100 Subject: [PATCH] Add single compactor http client for delete and gennumber clients (#7453) --- CHANGELOG.md | 1 + pkg/loki/loki.go | 4 +- pkg/loki/modules.go | 17 +++++--- pkg/querier/queryrange/limits_test.go | 4 +- .../queryrangebase/results_cache.go | 5 ++- .../queryrangebase/results_cache_test.go | 5 +++ pkg/querier/queryrange/roundtrip.go | 5 ++- pkg/querier/queryrange/roundtrip_test.go | 16 +++---- .../compactor/compactor_client.go | 42 +++++++++++++++++++ .../deletion/delete_requests_client.go | 36 ---------------- 10 files changed, 80 insertions(+), 55 deletions(-) create mode 100644 pkg/storage/stores/indexshipper/compactor/compactor_client.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d893a0379..fb65e3d8f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [7270](https://github.com/grafana/loki/pull/7270) **wilfriedroset**: Add support for `username` to redis cache configuration. ##### Fixes +* [7453](https://github.com/grafana/loki/pull/7453) **periklis**: Add single compactor http client for delete and gennumber clients * [7426](https://github.com/grafana/loki/pull/7426) **periklis**: Add missing compactor delete client tls client config * [7238](https://github.com/grafana/loki/pull/7328) **periklis**: Fix internal server bootstrap for query frontend * [7288](https://github.com/grafana/loki/pull/7288) **ssncferreira**: Fix query mapping in AST mapper `rangemapper` to support the new `VectorExpr` expression. diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 309b381038..0f6d53790b 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -74,7 +74,7 @@ type Config struct { InternalServer internalserver.Config `yaml:"internal_server,omitempty"` Distributor distributor.Config `yaml:"distributor,omitempty"` Querier querier.Config `yaml:"querier,omitempty"` - DeleteClient deletion.Config `yaml:"delete_client,omitempty"` + CompactorClient compactor.ClientConfig `yaml:"compactor_client,omitempty"` IngesterClient client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` StorageConfig storage.Config `yaml:"storage_config,omitempty"` @@ -115,7 +115,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Common.RegisterFlags(f) c.Distributor.RegisterFlags(f) c.Querier.RegisterFlags(f) - c.DeleteClient.RegisterFlags(f) + c.CompactorClient.RegisterFlags(f) c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) c.StorageConfig.RegisterFlags(f) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 3168e03075..d547cc131b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -350,7 +350,7 @@ func (t *Loki) initQuerier() (services.Service, error) { toMerge := []middleware.Interface{ httpreq.ExtractQueryMetricsMiddleware(), } - if t.supportIndexDeleteRequest() { + if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled { toMerge = append( toMerge, queryrangebase.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader), @@ -660,7 +660,8 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { t.Cfg.QueryRange, util_log.Logger, t.overrides, - t.Cfg.SchemaConfig, t.cacheGenerationLoader, + t.Cfg.SchemaConfig, + t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled, prometheus.DefaultRegisterer, ) if err != nil { @@ -679,7 +680,13 @@ func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) { if err != nil { return nil, err } - client, err = generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) + + httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) + if err != nil { + return nil, err + } + + client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient) if err != nil { return nil, err } @@ -1112,7 +1119,7 @@ func (t *Loki) initUsageReport() (services.Service, error) { } func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overrides) (deletion.DeleteRequestsClient, error) { - if !t.supportIndexDeleteRequest() { + if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil } @@ -1121,7 +1128,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri return nil, err } - httpClient, err := deletion.NewDeleteHTTPClient(t.Cfg.DeleteClient) + httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) if err != nil { return nil, err } diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 678a602fa6..28dd30aff9 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -52,7 +52,7 @@ func Test_seriesLimiter(t *testing.T) { cfg.CacheResults = false // split in 7 with 2 in // max. l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour) - tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -237,7 +237,7 @@ func Test_MaxQueryLookBack(t *testing.T) { tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{ maxQueryLookback: 1 * time.Hour, maxQueryParallelism: 1, - }, config.SchemaConfig{}, nil, nil) + }, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index bd6c783f96..24a0cfb7d0 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -163,6 +163,7 @@ type resultsCache struct { merger Merger cacheGenNumberLoader CacheGenNumberLoader shouldCache ShouldCacheFn + retentionEnabled bool metrics *ResultsCacheMetrics } @@ -181,6 +182,7 @@ func NewResultsCacheMiddleware( extractor Extractor, cacheGenNumberLoader CacheGenNumberLoader, shouldCache ShouldCacheFn, + retentionEnabled bool, metrics *ResultsCacheMetrics, ) (Middleware, error) { if cacheGenNumberLoader != nil { @@ -199,6 +201,7 @@ func NewResultsCacheMiddleware( splitter: splitter, cacheGenNumberLoader: cacheGenNumberLoader, shouldCache: shouldCache, + retentionEnabled: retentionEnabled, metrics: metrics, } }), nil @@ -214,7 +217,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { return s.next.Do(ctx, r) } - if s.cacheGenNumberLoader != nil { + if s.cacheGenNumberLoader != nil && s.retentionEnabled { ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs)) } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index 8a4cf0f818..e73989b2e3 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -765,6 +765,7 @@ func TestResultsCache(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -807,6 +808,7 @@ func TestResultsCacheRecent(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -871,6 +873,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -910,6 +913,7 @@ func Test_resultsCache_MissingData(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -1021,6 +1025,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { PrometheusResponseExtractor{}, nil, tc.shouldCache, + false, nil, ) require.NoError(t, err) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 7982db1e2c..bb83bdd4c1 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -45,6 +45,7 @@ func NewTripperware( limits Limits, schema config.SchemaConfig, cacheGenNumLoader queryrangebase.CacheGenNumberLoader, + retentionEnabled bool, registerer prometheus.Registerer, ) (queryrangebase.Tripperware, Stopper, error) { metrics := NewMetrics(registerer) @@ -65,7 +66,7 @@ func NewTripperware( } metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c, - cacheGenNumLoader, PrometheusExtractor{}, metrics, registerer) + cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, registerer) if err != nil { return nil, nil, err } @@ -395,6 +396,7 @@ func NewMetricTripperware( codec queryrangebase.Codec, c cache.Cache, cacheGenNumLoader queryrangebase.CacheGenNumberLoader, + retentionEnabled bool, extractor queryrangebase.Extractor, metrics *Metrics, registerer prometheus.Registerer, @@ -427,6 +429,7 @@ func NewMetricTripperware( func(r queryrangebase.Request) bool { return !r.GetCachingOptions().Disabled }, + retentionEnabled, metrics.ResultsCacheMetrics, ) if err != nil { diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 7aa6acab9b..2a17dc1de2 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -110,7 +110,7 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour) - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -173,7 +173,7 @@ func TestMetricsTripperware(t *testing.T) { } func TestLogFilterTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -222,7 +222,7 @@ func TestLogFilterTripperware(t *testing.T) { func TestInstantQueryTripperware(t *testing.T) { testShardingConfig := testConfig testShardingConfig.ShardedQueries = true - tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -258,7 +258,7 @@ func TestInstantQueryTripperware(t *testing.T) { } func TestSeriesTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -299,7 +299,7 @@ func TestSeriesTripperware(t *testing.T) { } func TestLabelsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -345,7 +345,7 @@ func TestLabelsTripperware(t *testing.T) { } func TestLogNoFilter(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -381,7 +381,7 @@ func TestLogNoFilter(t *testing.T) { func TestRegexpParamsSupport(t *testing.T) { l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour) - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -464,7 +464,7 @@ func TestPostQueries(t *testing.T) { } func TestEntriesLimitsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/storage/stores/indexshipper/compactor/compactor_client.go b/pkg/storage/stores/indexshipper/compactor/compactor_client.go new file mode 100644 index 0000000000..63468568dc --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/compactor_client.go @@ -0,0 +1,42 @@ +package compactor + +import ( + "flag" + "net/http" + "time" + + "github.com/grafana/dskit/crypto/tls" +) + +// Config for compactor's generation-number client +type ClientConfig struct { + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) { + prefix := "boltdb.shipper.compactor.client" + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, + "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) +} + +// NewDeleteHTTPClient return a pointer to a http client instance based on the +// delete client tls settings. +func NewCompactorHTTPClient(cfg ClientConfig) (*http.Client, error) { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 250 + transport.MaxIdleConnsPerHost = 250 + + if cfg.TLSEnabled { + tlsCfg, err := cfg.TLS.GetTLSConfig() + if err != nil { + return nil, err + } + + transport.TLSClientConfig = tlsCfg + } + + return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go index 91c9d30d49..3e8639e50c 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go @@ -3,7 +3,6 @@ package deletion import ( "context" "encoding/json" - "flag" "fmt" "io" "net/http" @@ -14,8 +13,6 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/dskit/crypto/tls" - "github.com/grafana/loki/pkg/util/log" ) @@ -24,20 +21,6 @@ const ( getDeletePath = "/loki/api/v1/delete" ) -// Config for compactor's delete client -type Config struct { - TLSEnabled bool `yaml:"tls_enabled"` - TLS tls.ClientConfig `yaml:",inline"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - prefix := "boltdb.shipper.compactor.delete_client" - f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, - "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") - cfg.TLS.RegisterFlagsWithPrefix(prefix, f) -} - type DeleteRequestsClient interface { GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) Stop() @@ -69,25 +52,6 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { } } -// NewDeleteHTTPClient return a pointer to a http client instance based on the -// delete client tls settings. -func NewDeleteHTTPClient(cfg Config) (*http.Client, error) { - transport := http.DefaultTransport.(*http.Transport).Clone() - transport.MaxIdleConns = 250 - transport.MaxIdleConnsPerHost = 250 - - if cfg.TLSEnabled { - tlsCfg, err := cfg.TLS.GetTLSConfig() - if err != nil { - return nil, err - } - - transport.TLSClientConfig = tlsCfg - } - - return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil -} - func NewDeleteRequestsClient(addr string, c httpClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { u, err := url.Parse(addr) if err != nil {