From fa247353edef6daf25208bccf3e159f6a376cfbd Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Mon, 17 May 2021 10:59:37 -0400 Subject: [PATCH] Add a flag which allows the queriers to run completely standalone and only query stored data. (#3700) --- docs/sources/configuration/_index.md | 5 ++++ pkg/loki/modules.go | 4 +++ pkg/querier/querier.go | 38 +++++++++++++++++----------- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index d30b911f5c..bdaa8937da 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -289,6 +289,11 @@ The `querier_config` block configures the Loki Querier. # CLI flag: -querier.query-ingesters-within [query_ingesters_within: | default = 0s] +# Only query the store, do not attempt to query any ingesters, +# useful for running a standalone querier pool opearting only against stored data. +# CLI flag: -querier.query-store-only +[query_store_only: | default = false] + # Configuration options for the LogQL engine. engine: # Timeout for query execution diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 9cda978acf..ebb21faba7 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -326,6 +326,10 @@ func (t *Loki) initStore() (_ services.Service, err error) { boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg) switch t.Cfg.Target { case Querier, Ruler: + // Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true + if t.Cfg.Querier.QueryStoreOnly { + break + } // Use AsyncStore to query both ingesters local store and chunk store for store queries. // Only queriers should use the AsyncStore, it should never be used in ingesters. chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 716cfe953b..323adf26dd 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -46,6 +46,7 @@ type Config struct { IngesterQueryStoreMaxLookback time.Duration `yaml:"-"` Engine logql.EngineOpts `yaml:"engine,omitempty"` MaxConcurrent int `yaml:"max_concurrent"` + QueryStoreOnly bool `yaml:"query_store_only"` } // RegisterFlags register flags. @@ -56,6 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ExtraQueryDelay, "querier.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") + f.BoolVar(&cfg.QueryStoreOnly, "querier.query-store-only", false, "Queriers should only query the store and not try to query any ingesters") } // Querier handlers queries. @@ -96,7 +98,7 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) iters := []iter.EntryIterator{} - if ingesterQueryInterval != nil { + if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { // Make a copy of the request before modifying // because the initial request is used below to query stores queryRequestCopy := *params.QueryRequest @@ -139,7 +141,7 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) iters := []iter.SampleIterator{} - if ingesterQueryInterval != nil { + if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { // Make a copy of the request before modifying // because the initial request is used below to query stores queryRequestCopy := *params.SampleQueryRequest @@ -261,9 +263,12 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout)) defer cancel() - ingesterValues, err := q.ingesterQuerier.Label(ctx, req) - if err != nil { - return nil, err + var ingesterValues [][]string + if !q.cfg.QueryStoreOnly { + ingesterValues, err = q.ingesterQuerier.Label(ctx, req) + if err != nil { + return nil, err + } } from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) @@ -371,17 +376,20 @@ func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) errs := make(chan error, 2) // fetch series from ingesters and store concurrently + if q.cfg.QueryStoreOnly { + series <- [][]logproto.SeriesIdentifier{} + } else { + go func() { + // fetch series identifiers from ingesters + resps, err := q.ingesterQuerier.Series(ctx, req) + if err != nil { + errs <- err + return + } - go func() { - // fetch series identifiers from ingesters - resps, err := q.ingesterQuerier.Series(ctx, req) - if err != nil { - errs <- err - return - } - - series <- resps - }() + series <- resps + }() + } go func() { storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups())