feat(unified-storage): add qos support for the resource server (#105939)

pull/107451/head
Mustafa Sencer Özcan 3 weeks ago committed by GitHub
parent 71a4f20770
commit 974a2c47f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      pkg/server/distributor_test.go
  2. 11
      pkg/server/module_server.go
  3. 3
      pkg/setting/setting.go
  4. 5
      pkg/setting/setting_unified_storage.go
  5. 49
      pkg/storage/unified/client.go
  6. 16
      pkg/storage/unified/resource/errors.go
  7. 139
      pkg/storage/unified/resource/server.go
  8. 83
      pkg/storage/unified/sql/server.go
  9. 116
      pkg/storage/unified/sql/service.go
  10. 19
      pkg/util/scheduler/queue.go
  11. 4
      pkg/util/scheduler/queue_test.go
  12. 12
      pkg/util/scheduler/scheduler_test.go

@ -352,7 +352,18 @@ func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces
require.NoError(t, err)
searchOpts, err := search.NewSearchOptions(features, cfg, tracer, docBuilders, nil)
require.NoError(t, err)
server, err := sql.NewResourceServer(nil, cfg, tracer, nil, nil, searchOpts, nil, nil, features)
server, err := sql.NewResourceServer(sql.ServerOptions{
DB: nil,
Cfg: cfg,
Tracer: tracer,
Reg: nil,
AccessClient: nil,
SearchOptions: searchOpts,
StorageMetrics: nil,
IndexMetrics: nil,
Features: features,
QOSQueue: nil,
})
require.NoError(t, err)
testUserA := &identity.StaticRequester{

@ -53,7 +53,16 @@ func NewModule(opts Options,
return s, nil
}
func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg, storageMetrics *resource.StorageMetrics, indexMetrics *resource.BleveIndexMetrics, reg prometheus.Registerer, promGatherer prometheus.Gatherer, license licensing.Licensing) (*ModuleServer, error) {
func newModuleServer(opts Options,
apiOpts api.ServerOptions,
features featuremgmt.FeatureToggles,
cfg *setting.Cfg,
storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics,
reg prometheus.Registerer,
promGatherer prometheus.Gatherer,
license licensing.Licensing,
) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
s := &ModuleServer{

@ -567,6 +567,9 @@ type Cfg struct {
IndexRebuildInterval time.Duration
IndexCacheTTL time.Duration
EnableSharding bool
QOSEnabled bool
QOSNumberWorker int
QOSMaxSizePerTenant int
MemberlistBindAddr string
MemberlistAdvertiseAddr string
MemberlistAdvertisePort int

@ -49,13 +49,16 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
}
cfg.UnifiedStorage = storageConfig
// Set indexer config for unified storaae
// Set indexer config for unified storage
section := cfg.Raw.Section("unified_storage")
cfg.MaxPageSizeBytes = section.Key("max_page_size_bytes").MustInt(0)
cfg.IndexPath = section.Key("index_path").String()
cfg.IndexWorkers = section.Key("index_workers").MustInt(10)
cfg.IndexMaxBatchSize = section.Key("index_max_batch_size").MustInt(100)
cfg.EnableSharding = section.Key("enable_sharding").MustBool(false)
cfg.QOSEnabled = section.Key("qos_enabled").MustBool(false)
cfg.QOSNumberWorker = section.Key("qos_num_worker").MustInt(16)
cfg.QOSMaxSizePerTenant = section.Key("qos_max_size_per_tenant").MustInt(1000)
cfg.MemberlistBindAddr = section.Key("memberlist_bind_addr").String()
cfg.MemberlistAdvertiseAddr = section.Key("memberlist_advertise_addr").String()
cfg.MemberlistAdvertisePort = section.Key("memberlist_advertise_port").MustInt(7946)

@ -20,6 +20,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@ -31,6 +32,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/grafana/grafana/pkg/util/scheduler"
)
type Options struct {
@ -49,7 +51,10 @@ type clientMetrics struct {
}
// This adds a UnifiedStorage client into the wire dependency tree
func ProvideUnifiedStorageClient(opts *Options, storageMetrics *resource.StorageMetrics, indexMetrics *resource.BleveIndexMetrics) (resource.ResourceClient, error) {
func ProvideUnifiedStorageClient(opts *Options,
storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics,
) (resource.ResourceClient, error) {
// See: apiserver.applyAPIServerConfig(cfg, features, o)
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
client, err := newClient(options.StorageOptions{
@ -83,6 +88,7 @@ func newClient(opts options.StorageOptions,
indexMetrics *resource.BleveIndexMetrics,
) (resource.ResourceClient, error) {
ctx := context.Background()
switch opts.StorageType {
case options.StorageTypeFile:
if opts.DataPath == "" {
@ -146,13 +152,50 @@ func newClient(opts options.StorageOptions,
}
return client, nil
// Use the local SQL
default:
searchOptions, err := search.NewSearchOptions(features, cfg, tracer, docs, indexMetrics)
if err != nil {
return nil, err
}
server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions, storageMetrics, indexMetrics, features)
serverOptions := sql.ServerOptions{
DB: db,
Cfg: cfg,
Tracer: tracer,
Reg: reg,
AccessClient: authzc,
SearchOptions: searchOptions,
StorageMetrics: storageMetrics,
IndexMetrics: indexMetrics,
Features: features,
}
if cfg.QOSEnabled {
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
queue := scheduler.NewQueue(&scheduler.QueueOptions{
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
Registerer: qosReg,
Logger: cfg.Logger,
})
if err := services.StartAndAwaitRunning(ctx, queue); err != nil {
return nil, fmt.Errorf("failed to start queue: %w", err)
}
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker,
Logger: cfg.Logger,
})
if err != nil {
return nil, fmt.Errorf("failed to create scheduler: %w", err)
}
err = services.StartAndAwaitRunning(ctx, scheduler)
if err != nil {
return nil, fmt.Errorf("failed to start scheduler: %w", err)
}
serverOptions.QOSQueue = queue
}
server, err := sql.NewResourceServer(serverOptions)
if err != nil {
return nil, err
}

@ -12,6 +12,7 @@ import (
grpcstatus "google.golang.org/grpc/status"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/scheduler"
)
// Package-level errors.
@ -50,6 +51,14 @@ func NewNotFoundError(key *resourcepb.ResourceKey) *resourcepb.ErrorResult {
}
}
func NewTooManyRequestsError(msg string) *resourcepb.ErrorResult {
return &resourcepb.ErrorResult{
Message: msg,
Code: http.StatusTooManyRequests,
Reason: string(metav1.StatusReasonTooManyRequests),
}
}
// Convert golang errors to status result errors that can be returned to a client
func AsErrorResult(err error) *resourcepb.ErrorResult {
if err == nil {
@ -125,3 +134,10 @@ func GetError(res *resourcepb.ErrorResult) error {
}
return status
}
func HandleQueueError[T any](err error, makeResp func(*resourcepb.ErrorResult) *T) (*T, error) {
if errors.Is(err, scheduler.ErrTenantQueueFull) {
return makeResp(NewTooManyRequestsError("tenant queue is full, please try again later")), nil
}
return makeResp(AsErrorResult(err)), nil
}

@ -19,9 +19,20 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
claims "github.com/grafana/authlib/types"
"github.com/grafana/dskit/backoff"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/util/scheduler"
)
const (
// DefaultMaxBackoff is the default maximum backoff duration for enqueue operations.
DefaultMaxBackoff = 1 * time.Second
// DefaultMinBackoff is the default minimum backoff duration for enqueue operations.
DefaultMinBackoff = 100 * time.Millisecond
// DefaultMaxRetries is the default maximum number of retries for enqueue operations.
DefaultMaxRetries = 3
)
// ResourceServer implements all gRPC services
@ -134,6 +145,10 @@ type BlobSupport interface {
// TODO? List+Delete? This is for admin access
}
type QOSEnqueuer interface {
Enqueue(ctx context.Context, tenantID string, runnable func(ctx context.Context)) error
}
type BlobConfig struct {
// The CDK configuration URL
URL string
@ -203,7 +218,11 @@ type ResourceServerOptions struct {
IndexMetrics *BleveIndexMetrics
// MaxPageSizeBytes is the maximum size of a page in bytes.
MaxPageSizeBytes int
// QOSQueue is the quality of service queue used to enqueue
QOSQueue QOSEnqueuer
}
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
@ -222,6 +241,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
if opts.Diagnostics == nil {
opts.Diagnostics = &noopService{}
}
if opts.Now == nil {
opts.Now = func() int64 {
return time.Now().UnixMilli()
@ -233,6 +253,10 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
opts.MaxPageSizeBytes = 1024 * 1024 * 2
}
if opts.QOSQueue == nil {
opts.QOSQueue = scheduler.NewNoopQueue()
}
// Initialize the blob storage
blobstore := opts.Blob.Backend
if blobstore == nil {
@ -275,6 +299,8 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes,
reg: opts.Reg,
queue: opts.QOSQueue,
}
if opts.Search.Resources != nil {
@ -321,6 +347,8 @@ type server struct {
initErr error
maxPageSizeBytes int
reg prometheus.Registerer
queue QOSEnqueuer
}
// Init implements ResourceServer.
@ -570,6 +598,25 @@ func (s *server) Create(ctx context.Context, req *resourcepb.CreateRequest) (*re
return rsp, nil
}
var (
res *resourcepb.CreateResponse
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(ctx context.Context) {
res, err = s.create(ctx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.CreateResponse {
return &resourcepb.CreateResponse{Error: e}
})
}
return res, err
}
func (s *server) create(ctx context.Context, user claims.AuthInfo, req *resourcepb.CreateRequest) (*resourcepb.CreateResponse, error) {
rsp := &resourcepb.CreateResponse{}
event, e := s.newEvent(ctx, user, req.Key, req.Value, nil)
if e != nil {
rsp.Error = e
@ -605,6 +652,24 @@ func (s *server) Update(ctx context.Context, req *resourcepb.UpdateRequest) (*re
return rsp, nil
}
var (
res *resourcepb.UpdateResponse
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(ctx context.Context) {
res, err = s.update(ctx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.UpdateResponse {
return &resourcepb.UpdateResponse{Error: e}
})
}
return res, err
}
func (s *server) update(ctx context.Context, user claims.AuthInfo, req *resourcepb.UpdateRequest) (*resourcepb.UpdateResponse, error) {
rsp := &resourcepb.UpdateResponse{}
latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
Key: req.Key,
})
@ -654,6 +719,25 @@ func (s *server) Delete(ctx context.Context, req *resourcepb.DeleteRequest) (*re
return rsp, nil
}
var (
res *resourcepb.DeleteResponse
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(ctx context.Context) {
res, err = s.delete(ctx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.DeleteResponse {
return &resourcepb.DeleteResponse{Error: e}
})
}
return res, err
}
func (s *server) delete(ctx context.Context, user claims.AuthInfo, req *resourcepb.DeleteRequest) (*resourcepb.DeleteResponse, error) {
rsp := &resourcepb.DeleteResponse{}
latest := s.backend.ReadResource(ctx, &resourcepb.ReadRequest{
Key: req.Key,
})
@ -744,6 +828,23 @@ func (s *server) Read(ctx context.Context, req *resourcepb.ReadRequest) (*resour
return &resourcepb.ReadResponse{Error: NewBadRequestError("missing resource")}, nil
}
var (
res *resourcepb.ReadResponse
err error
)
runErr := s.runInQueue(ctx, req.Key.Namespace, func(ctx context.Context) {
res, err = s.read(ctx, user, req)
})
if runErr != nil {
return HandleQueueError(runErr, func(e *resourcepb.ErrorResult) *resourcepb.ReadResponse {
return &resourcepb.ReadResponse{Error: e}
})
}
return res, err
}
func (s *server) read(ctx context.Context, user claims.AuthInfo, req *resourcepb.ReadRequest) (*resourcepb.ReadResponse, error) {
rsp := s.backend.ReadResource(ctx, req)
if rsp.Error != nil && rsp.Error.Code == http.StatusNotFound {
return &resourcepb.ReadResponse{Error: rsp.Error}, nil
@ -1237,3 +1338,41 @@ func (s *server) GetBlob(ctx context.Context, req *resourcepb.GetBlobRequest) (*
}
return rsp, nil
}
func (s *server) runInQueue(ctx context.Context, tenantID string, runnable func(ctx context.Context)) error {
boff := backoff.New(ctx, backoff.Config{
MinBackoff: DefaultMinBackoff,
MaxBackoff: DefaultMaxBackoff,
MaxRetries: DefaultMaxRetries,
})
var (
wg sync.WaitGroup
err error
)
wg.Add(1)
wrapped := func(ctx context.Context) {
runnable(ctx)
wg.Done()
}
for boff.Ongoing() {
err = s.queue.Enqueue(ctx, tenantID, wrapped)
if err == nil {
break
}
s.log.Warn("failed to enqueue runnable, retrying",
"maxRetries", DefaultMaxRetries,
"tenantID", tenantID,
"error", err)
boff.Wait()
}
if err != nil {
s.log.Error("failed to enqueue runnable",
"maxRetries", DefaultMaxRetries,
"tenantID", tenantID,
"error", err)
return fmt.Errorf("failed to enqueue runnable for tenant %s: %w", tenantID, err)
}
wg.Wait()
return nil
}

@ -1,6 +1,7 @@
package sql
import (
"context"
"os"
"strings"
@ -8,6 +9,7 @@ import (
"go.opentelemetry.io/otel/trace"
"github.com/grafana/authlib/types"
"github.com/grafana/dskit/services"
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/featuremgmt"
@ -17,70 +19,85 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
)
type QOSEnqueueDequeuer interface {
services.Service
Enqueue(ctx context.Context, tenantID string, runnable func(ctx context.Context)) error
Dequeue(ctx context.Context) (func(ctx context.Context), error)
}
// ServerOptions contains the options for creating a new ResourceServer
type ServerOptions struct {
DB infraDB.DB
Cfg *setting.Cfg
Tracer trace.Tracer
Reg prometheus.Registerer
AccessClient types.AccessClient
SearchOptions resource.SearchOptions
StorageMetrics *resource.StorageMetrics
IndexMetrics *resource.BleveIndexMetrics
Features featuremgmt.FeatureToggles
QOSQueue QOSEnqueueDequeuer
}
// Creates a new ResourceServer
func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
tracer trace.Tracer, reg prometheus.Registerer, ac types.AccessClient,
searchOptions resource.SearchOptions, storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics, features featuremgmt.FeatureToggles) (resource.ResourceServer, error) {
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
opts := resource.ResourceServerOptions{
Tracer: tracer,
func NewResourceServer(
opts ServerOptions,
) (resource.ResourceServer, error) {
apiserverCfg := opts.Cfg.SectionWithEnvOverrides("grafana-apiserver")
serverOptions := resource.ResourceServerOptions{
Tracer: opts.Tracer,
Blob: resource.BlobConfig{
URL: apiserverCfg.Key("blob_url").MustString(""),
},
Reg: reg,
Reg: opts.Reg,
}
if ac != nil {
opts.AccessClient = resource.NewAuthzLimitedClient(ac, resource.AuthzOptions{Tracer: tracer, Registry: reg})
if opts.AccessClient != nil {
serverOptions.AccessClient = resource.NewAuthzLimitedClient(opts.AccessClient, resource.AuthzOptions{Tracer: opts.Tracer, Registry: opts.Reg})
}
// Support local file blob
if strings.HasPrefix(opts.Blob.URL, "./data/") {
dir := strings.Replace(opts.Blob.URL, "./data", cfg.DataPath, 1)
if strings.HasPrefix(serverOptions.Blob.URL, "./data/") {
dir := strings.Replace(serverOptions.Blob.URL, "./data", opts.Cfg.DataPath, 1)
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, err
}
opts.Blob.URL = "file:///" + dir
serverOptions.Blob.URL = "file:///" + dir
}
// This is mostly for testing, being able to influence when we paginate
// based on the page size during tests.
unifiedStorageCfg := cfg.SectionWithEnvOverrides("unified_storage")
unifiedStorageCfg := opts.Cfg.SectionWithEnvOverrides("unified_storage")
maxPageSizeBytes := unifiedStorageCfg.Key("max_page_size_bytes")
opts.MaxPageSizeBytes = maxPageSizeBytes.MustInt(0)
serverOptions.MaxPageSizeBytes = maxPageSizeBytes.MustInt(0)
eDB, err := dbimpl.ProvideResourceDB(db, cfg, tracer)
eDB, err := dbimpl.ProvideResourceDB(opts.DB, opts.Cfg, opts.Tracer)
if err != nil {
return nil, err
}
isHA := isHighAvailabilityEnabled(cfg.SectionWithEnvOverrides("database"),
cfg.SectionWithEnvOverrides("resource_api"))
withPruner := features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner)
isHA := isHighAvailabilityEnabled(opts.Cfg.SectionWithEnvOverrides("database"),
opts.Cfg.SectionWithEnvOverrides("resource_api"))
withPruner := opts.Features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner)
store, err := NewBackend(BackendOptions{
DBProvider: eDB,
Tracer: tracer,
Reg: reg,
Tracer: opts.Tracer,
Reg: opts.Reg,
IsHA: isHA,
withPruner: withPruner,
storageMetrics: storageMetrics,
storageMetrics: opts.StorageMetrics,
})
if err != nil {
return nil, err
}
opts.Backend = store
opts.Diagnostics = store
opts.Lifecycle = store
opts.Search = searchOptions
opts.IndexMetrics = indexMetrics
rs, err := resource.NewResourceServer(opts)
if err != nil {
return nil, err
}
serverOptions.Backend = store
serverOptions.Diagnostics = store
serverOptions.Lifecycle = store
serverOptions.Search = opts.SearchOptions
serverOptions.IndexMetrics = opts.IndexMetrics
serverOptions.QOSQueue = opts.QOSQueue
return rs, nil
return resource.NewResourceServer(serverOptions)
}
// isHighAvailabilityEnabled determines if high availability mode should

@ -34,6 +34,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/util/scheduler"
)
var (
@ -50,6 +51,11 @@ type UnifiedStorageGrpcService interface {
type service struct {
*services.BasicService
// Subservices manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
hasSubservices bool
cfg *setting.Cfg
features featuremgmt.FeatureToggles
db infraDB.DB
@ -71,6 +77,9 @@ type service struct {
storageRing *ring.Ring
lifecycler *ring.BasicLifecycler
queue QOSEnqueueDequeuer
scheduler *scheduler.Scheduler
}
func ProvideUnifiedStorageGrpcService(
@ -85,6 +94,7 @@ func ProvideUnifiedStorageGrpcService(
storageRing *ring.Ring,
memberlistKVConfig kv.Config,
) (UnifiedStorageGrpcService, error) {
var err error
tracer := otel.Tracer("unified-storage")
// FIXME: This is a temporary solution while we are migrating to the new authn interceptor
@ -95,20 +105,22 @@ func ProvideUnifiedStorageGrpcService(
})
s := &service{
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
authenticator: authn,
tracing: tracer,
db: db,
log: log,
reg: reg,
docBuilders: docBuilders,
storageMetrics: storageMetrics,
indexMetrics: indexMetrics,
storageRing: storageRing,
cfg: cfg,
features: features,
stopCh: make(chan struct{}),
authenticator: authn,
tracing: tracer,
db: db,
log: log,
reg: reg,
docBuilders: docBuilders,
storageMetrics: storageMetrics,
indexMetrics: indexMetrics,
storageRing: storageRing,
subservicesWatcher: services.NewFailureWatcher(),
}
subservices := []services.Service{}
if cfg.EnableSharding {
ringStore, err := kv.NewClient(
memberlistKVConfig,
@ -143,15 +155,50 @@ func ProvideUnifiedStorageGrpcService(
if err != nil {
return nil, fmt.Errorf("failed to initialize storage-ring lifecycler: %s", err)
}
subservices = append(subservices, s.lifecycler)
}
if cfg.QOSEnabled {
qosReg := prometheus.WrapRegistererWithPrefix("resource_server_qos_", reg)
queue := scheduler.NewQueue(&scheduler.QueueOptions{
MaxSizePerTenant: cfg.QOSMaxSizePerTenant,
Registerer: qosReg,
})
scheduler, err := scheduler.NewScheduler(queue, &scheduler.Config{
NumWorkers: cfg.QOSNumberWorker,
Logger: log,
})
if err != nil {
return nil, fmt.Errorf("failed to create qos scheduler: %s", err)
}
s.queue = queue
s.scheduler = scheduler
subservices = append(subservices, s.queue, s.scheduler)
}
if len(subservices) > 0 {
s.hasSubservices = true
s.subservices, err = services.NewManager(subservices...)
if err != nil {
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
}
}
// This will be used when running as a dskit service
s.BasicService = services.NewBasicService(s.start, s.running, s.stopping).WithName(modules.StorageServer)
s.BasicService = services.NewBasicService(s.starting, s.running, s.stopping).WithName(modules.StorageServer)
return s, nil
}
func (s *service) start(ctx context.Context) error {
func (s *service) starting(ctx context.Context) error {
if s.hasSubservices {
s.subservicesWatcher.WatchManager(s.subservices)
if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil {
return fmt.Errorf("failed to start subservices: %w", err)
}
}
authzClient, err := authz.ProvideStandaloneAuthZClient(s.cfg, s.features, s.tracing)
if err != nil {
return err
@ -162,7 +209,19 @@ func (s *service) start(ctx context.Context) error {
return err
}
server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions, s.storageMetrics, s.indexMetrics, s.features)
serverOptions := ServerOptions{
DB: s.db,
Cfg: s.cfg,
Tracer: s.tracing,
Reg: s.reg,
AccessClient: authzClient,
SearchOptions: searchOptions,
StorageMetrics: s.storageMetrics,
IndexMetrics: s.indexMetrics,
Features: s.features,
QOSQueue: s.queue,
}
server, err := NewResourceServer(serverOptions)
if err != nil {
return err
}
@ -192,11 +251,6 @@ func (s *service) start(ctx context.Context) error {
}
if s.cfg.EnableSharding {
err = s.lifecycler.StartAsync(ctx)
if err != nil {
return fmt.Errorf("failed to start the lifecycler: %s", err)
}
s.log.Info("waiting until resource server is JOINING in the ring")
lfcCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
@ -231,15 +285,27 @@ func (s *service) GetAddress() string {
func (s *service) running(ctx context.Context) error {
select {
case err := <-s.stoppedCh:
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
case err := <-s.subservicesWatcher.Chan():
return fmt.Errorf("subservice failure: %w", err)
case <-ctx.Done():
close(s.stopCh)
}
return nil
}
func (s *service) stopping(_ error) error {
if s.hasSubservices {
err := services.StopManagerAndAwaitStopped(context.Background(), s.subservices)
if err != nil {
return fmt.Errorf("failed to stop subservices: %w", err)
}
}
return nil
}
type authenticatorWithFallback struct {
authenticator func(ctx context.Context) (context.Context, error)
fallback func(ctx context.Context) (context.Context, error)
@ -309,14 +375,6 @@ func NewAuthenticatorWithFallback(cfg *setting.Cfg, reg prometheus.Registerer, t
}
}
func (s *service) stopping(err error) error {
if err != nil && !errors.Is(err, context.Canceled) {
s.log.Error("stopping unified storage grpc service", "error", err)
return err
}
return nil
}
func toLifecyclerConfig(cfg *setting.Cfg, logger log.Logger) (ring.BasicLifecyclerConfig, error) {
instanceAddr, err := ring.GetInstanceAddr(cfg.MemberlistBindAddr, netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, logger), logger, true)
if err != nil {

@ -9,6 +9,8 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/grafana/pkg/infra/log"
)
const (
@ -82,6 +84,8 @@ func NewNoopQueue() *NoopQueue {
type Queue struct {
services.Service
logger log.Logger
enqueueChan chan enqueueRequest
dequeueChan chan dequeueRequest
lenChan chan lenRequest
@ -108,6 +112,7 @@ type Queue struct {
type QueueOptions struct {
MaxSizePerTenant int
Registerer prometheus.Registerer
Logger log.Logger
}
// NewQueue creates a new Queue and starts its dispatcher goroutine.
@ -116,7 +121,13 @@ func NewQueue(opts *QueueOptions) *Queue {
opts.MaxSizePerTenant = DefaultMaxSizePerTenant
}
if opts.Logger == nil {
opts.Logger = log.NewNopLogger()
}
q := &Queue{
logger: opts.Logger,
enqueueChan: make(chan enqueueRequest),
dequeueChan: make(chan dequeueRequest),
lenChan: make(chan lenRequest),
@ -226,6 +237,8 @@ func (q *Queue) handleLenRequest(req lenRequest) {
func (q *Queue) dispatcherLoop(ctx context.Context) error {
defer close(q.dispatcherStoppedChan)
q.logger.Info("queue running", "maxSizePerTenant", q.maxSizePerTenant)
for {
q.scheduleRoundRobin()
@ -275,7 +288,6 @@ func (q *Queue) Enqueue(ctx context.Context, tenantID string, runnable func(ctx
select {
case q.enqueueChan <- req:
err = <-respChan
q.enqueueDuration.Observe(time.Since(start).Seconds())
case <-q.dispatcherStoppedChan:
q.discardedRequests.WithLabelValues(tenantID, "dispatcher_stopped").Inc()
err = ErrQueueClosed
@ -283,6 +295,7 @@ func (q *Queue) Enqueue(ctx context.Context, tenantID string, runnable func(ctx
q.discardedRequests.WithLabelValues(tenantID, "context_canceled").Inc()
err = ctx.Err()
}
q.enqueueDuration.Observe(time.Since(start).Seconds())
return err
}
@ -352,6 +365,8 @@ func (q *Queue) ActiveTenantsLen() int {
}
func (q *Queue) stopping(_ error) error {
q.logger.Info("queue stopping")
q.queueLength.Reset()
q.discardedRequests.Reset()
for _, tq := range q.tenantQueues {
@ -359,5 +374,7 @@ func (q *Queue) stopping(_ error) error {
}
q.activeTenants.Init()
q.pendingDequeueRequests.Init()
q.logger.Info("queue stopped")
return nil
}

@ -11,6 +11,7 @@ import (
"time"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
@ -25,6 +26,9 @@ func QueueOptionsWithDefaults(opts *QueueOptions) *QueueOptions {
if opts.Registerer == nil {
opts.Registerer = prometheus.NewRegistry()
}
if opts.Logger == nil {
opts.Logger = log.New("qos.test")
}
return opts
}

@ -2,6 +2,7 @@ package scheduler
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
@ -130,16 +131,16 @@ func TestScheduler(t *testing.T) {
t.Run("ProcessItems", func(t *testing.T) {
t.Parallel()
q := NewQueue(QueueOptionsWithDefaults(nil))
q := NewQueue(QueueOptionsWithDefaults(&QueueOptions{MaxSizePerTenant: 1000}))
require.NoError(t, services.StartAndAwaitRunning(context.Background(), q))
const itemCount = 10
const itemCount = 1000
var processed sync.Map
var wg sync.WaitGroup
wg.Add(itemCount)
scheduler, err := NewScheduler(q, &Config{
NumWorkers: 2,
NumWorkers: 10,
MaxBackoff: 100 * time.Millisecond,
Logger: log.New("qos.test"),
})
@ -148,8 +149,11 @@ func TestScheduler(t *testing.T) {
for i := 0; i < itemCount; i++ {
itemID := i
require.NoError(t, q.Enqueue(context.Background(), "tenant-1", func(_ context.Context) {
tenantIndex := itemID % 10
tenantID := fmt.Sprintf("tenant-%d", tenantIndex)
require.NoError(t, q.Enqueue(context.Background(), tenantID, func(_ context.Context) {
processed.Store(itemID, true)
time.Sleep(10 * time.Millisecond)
wg.Done()
}))
}

Loading…
Cancel
Save