The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/aggregator/apiserver/apiserver.go

203 lines
7.1 KiB

// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes/kube-aggregator/blob/master/pkg/apiserver/apiserver.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package apiserver
import (
"context"
"net/http"
"time"
"k8s.io/apimachinery/pkg/util/sets"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1"
v0alpha1helper "github.com/grafana/grafana/pkg/aggregator/apis/aggregation/v0alpha1/helper"
clientset "github.com/grafana/grafana/pkg/aggregator/generated/clientset/versioned"
informers "github.com/grafana/grafana/pkg/aggregator/generated/informers/externalversions"
dataplaneservicerest "github.com/grafana/grafana/pkg/aggregator/registry/dataplaneservice/rest"
)
type ExtraConfig struct {
PluginClient PluginClient
PluginContextProvider PluginContextProvider
}
type Config struct {
GenericConfig *genericapiserver.RecommendedConfig
ExtraConfig ExtraConfig
}
type completedConfig struct {
GenericConfig genericapiserver.CompletedConfig
ExtraConfig *ExtraConfig
}
// CompletedConfig same as Config, just to swap private object.
type CompletedConfig struct {
// Embed a private pointer that cannot be instantiated outside of this package.
*completedConfig
}
type runnable interface {
RunWithContext(ctx context.Context) error
}
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
type preparedGrafanaAggregator struct {
*GrafanaAggregator
runnable runnable
}
// GrafanaAggregator contains state for a Kubernetes cluster master/api server.
type GrafanaAggregator struct {
GenericAPIServer *genericapiserver.GenericAPIServer
RegistrationInformers informers.SharedInformerFactory
delegateHandler http.Handler
proxyHandlers map[string]*proxyHandler
handledGroupVersions map[string]sets.Set[string]
PluginClient PluginClient
PluginContextProvider PluginContextProvider
}
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (cfg *Config) Complete() CompletedConfig {
c := completedConfig{
cfg.GenericConfig.Complete(),
&cfg.ExtraConfig,
}
c.GenericConfig.EnableDiscovery = true
return CompletedConfig{&c}
}
// NewWithDelegate returns a new instance of GrafanaAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*GrafanaAggregator, error) {
genericServer, err := c.GenericConfig.New("grafana-aggregator", delegationTarget)
if err != nil {
return nil, err
}
discoveryHandler := newApisProxyHandler(delegationTarget.UnprotectedHandler())
genericServer.Handler.GoRestfulContainer.Filter(discoveryHandler.handle)
dataplaneServiceRegistrationControllerInitiated := make(chan struct{})
if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("DataPlaneServiceRegistrationControllerInitiated", dataplaneServiceRegistrationControllerInitiated); err != nil {
return nil, err
}
dataplaneServiceClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(
dataplaneServiceClient,
5*time.Minute,
)
s := &GrafanaAggregator{
GenericAPIServer: genericServer,
RegistrationInformers: informerFactory,
delegateHandler: delegationTarget.UnprotectedHandler(),
handledGroupVersions: map[string]sets.Set[string]{},
proxyHandlers: map[string]*proxyHandler{},
PluginClient: c.ExtraConfig.PluginClient,
PluginContextProvider: c.ExtraConfig.PluginContextProvider,
}
dataplaneServiceRegistrationController := NewDataPlaneServiceRegistrationController(informerFactory.Aggregation().V0alpha1().DataPlaneServices(), s)
apiGroupInfo := dataplaneservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, true)
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
s.GenericAPIServer.AddPostStartHookOrDie("start-dataplane-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.Done())
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("dataplaneservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go dataplaneServiceRegistrationController.Run(context, dataplaneServiceRegistrationControllerInitiated)
select {
case <-context.Done():
case <-dataplaneServiceRegistrationControllerInitiated:
}
return nil
})
return s, nil
}
// PrepareRun prepares the aggregator to run
func (s *GrafanaAggregator) PrepareRun() (preparedGrafanaAggregator, error) {
prepared := s.GenericAPIServer.PrepareRun()
return preparedGrafanaAggregator{GrafanaAggregator: s, runnable: prepared}, nil
}
func (s preparedGrafanaAggregator) RunWithContext(ctx context.Context) error {
return s.runnable.RunWithContext(ctx)
}
func (s *GrafanaAggregator) AddDataPlaneService(dataplaneService *v0alpha1.DataPlaneService) error {
if proxyHandler, exists := s.proxyHandlers[dataplaneService.Name]; exists {
proxyHandler.updateDataPlaneService(dataplaneService)
return nil
}
proxyPath := "/apis/dataplane/" + dataplaneService.Spec.Group + "/" + dataplaneService.Spec.Version
proxyHandler := &proxyHandler{
localDelegate: s.delegateHandler,
client: s.PluginClient,
pluginContextProvider: s.PluginContextProvider,
}
proxyHandler.updateDataPlaneService(dataplaneService)
s.proxyHandlers[dataplaneService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
versions, exist := s.handledGroupVersions[dataplaneService.Spec.Group]
if exist {
versions.Insert(dataplaneService.Spec.Version)
return nil
}
s.handledGroupVersions[dataplaneService.Spec.Group] = sets.New[string](dataplaneService.Spec.Version)
return nil
}
func (s *GrafanaAggregator) RemoveDataPlaneService(dataplaneServiceName string) {
version := v0alpha1helper.DataPlaneServiceNameToGroupVersion(dataplaneServiceName)
proxyPath := "/apis/dataplane/" + version.Group + "/" + version.Version
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath)
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/")
delete(s.proxyHandlers, dataplaneServiceName)
versions, exist := s.handledGroupVersions[version.Group]
if !exist {
return
}
versions.Delete(version.Version)
if versions.Len() > 0 {
return
}
delete(s.handledGroupVersions, version.Group)
groupPath := "/apis/dataplane/" + version.Group
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(groupPath)
s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(groupPath + "/")
}
// DefaultAPIResourceConfigSource returns default configuration for an APIResource.
func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
ret := serverstorage.NewResourceConfig()
ret.EnableVersions(
v0alpha1.SchemeGroupVersion,
)
return ret
}