chore(engine): Use configured storage bucket in execution context (#17583)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/17119/merge
Christian Haudum 1 year ago committed by GitHub
parent 640eff5c25
commit 0ea1097c8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 14
      pkg/engine/engine.go
  2. 6
      pkg/engine/executor/executor.go
  3. 13
      pkg/loki/modules.go
  4. 9
      pkg/querier/http.go

@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/engine/executor"
@ -29,12 +30,19 @@ var (
)
// New creates a new instance of the query engine that implements the [logql.Engine] interface.
func New(opts logql.EngineOpts, metastore metastore.Metastore, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *QueryEngine {
func New(opts logql.EngineOpts, bucket objstore.Bucket, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *QueryEngine {
var ms metastore.Metastore
if bucket != nil {
ms = metastore.NewObjectMetastore(bucket)
}
return &QueryEngine{
logger: logger,
metrics: newMetrics(reg),
limits: limits,
metastore: metastore,
metastore: ms,
bucket: bucket,
opts: opts,
}
}
@ -45,6 +53,7 @@ type QueryEngine struct {
metrics *metrics
limits logql.Limits
metastore metastore.Metastore
bucket objstore.Bucket
opts logql.EngineOpts
}
@ -102,6 +111,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
t = time.Now() // start stopwatch for execution
cfg := executor.Config{
BatchSize: int64(e.opts.BatchSize),
Bucket: e.bucket,
}
pipeline := executor.Run(ctx, cfg, plan)
defer pipeline.Close()

@ -12,7 +12,7 @@ import (
)
type Config struct {
BatchSize int64 `yaml:"batch_size"`
BatchSize int64
Bucket objstore.Bucket
}
@ -64,6 +64,10 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
}
func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObjScan) Pipeline {
if c.bucket == nil {
return errorPipeline(errors.New("no object store bucket configured"))
}
predicates := make([]dataobj.LogsPredicate, 0, len(node.Predicates))
for _, p := range node.Predicates {

@ -16,9 +16,6 @@ import (
"strings"
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -38,8 +35,9 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"github.com/grafana/loki/v3/pkg/analytics"
blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder"
@ -614,16 +612,15 @@ func (t *Loki) initQuerier() (services.Service, error) {
serverutil.ResponseJSONMiddleware(),
}
var ms metastore.Metastore
var store objstore.Bucket
if t.Cfg.Querier.Engine.EnableV2Engine {
store, err := t.createDataObjBucket("dataobj-querier")
store, err = t.createDataObjBucket("dataobj-querier")
if err != nil {
return nil, err
}
ms = metastore.NewObjectMetastore(store)
}
t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Querier, t.Overrides, ms, prometheus.DefaultRegisterer, logger)
t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Querier, t.Overrides, store, prometheus.DefaultRegisterer, logger)
indexStatsHTTPMiddleware := querier.WrapQuerySpanAndTimeout("query.IndexStats", t.Overrides)
indexShardsHTTPMiddleware := querier.WrapQuerySpanAndTimeout("query.IndexShards", t.Overrides)

@ -11,16 +11,15 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/objstore"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/engine"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
@ -55,13 +54,13 @@ type QuerierAPI struct {
}
// NewQuerierAPI returns an instance of the QuerierAPI.
func NewQuerierAPI(cfg Config, querier Querier, limits querier_limits.Limits, metastore metastore.Metastore, reg prometheus.Registerer, logger log.Logger) *QuerierAPI {
func NewQuerierAPI(cfg Config, querier Querier, limits querier_limits.Limits, store objstore.Bucket, reg prometheus.Registerer, logger log.Logger) *QuerierAPI {
return &QuerierAPI{
cfg: cfg,
limits: limits,
querier: querier,
engineV1: logql.NewEngine(cfg.Engine, querier, limits, logger),
engineV2: engine.New(cfg.Engine, metastore, limits, reg, logger),
engineV2: engine.New(cfg.Engine, store, limits, reg, logger),
logger: logger,
}
}

Loading…
Cancel
Save