mirror of https://github.com/grafana/grafana
Remove kube-aggregator from OSS (#103659)
* feat: remove kube-aggregator for OSS and provide injection points with runner iface * upgrade authlib to support expiresIn * new FT * new FT again * update go.mod * get rid of the slice implementation * reconcile conflicts * gracefully handle enterprise not being linked situation with kubeAggregator FT true * allow dataplane agg and kube agg to both be added to delegate chain * make update-workspace * address feedback * revert go.mod changes * go.mod updates * elaborate on why and how of skipping the Ready channel handling * after rebase and make run98279-update-grafana-versionrelease-link-on-login-page-to-the-whats-new-doc
parent
da36279312
commit
aa2cf8e398
@ -1,127 +0,0 @@ |
||||
# aggregator |
||||
|
||||
This is a package that is intended to power the aggregation of microservices within Grafana. The concept |
||||
as well as implementation is largely borrowed from [kube-aggregator](https://github.com/kubernetes/kube-aggregator). |
||||
|
||||
## Why aggregate services? |
||||
|
||||
Grafana's future architecture will entail the same API Server design as that of Kubernetes API Servers. API Servers |
||||
provide a standard way of stitching together API Groups through discovery and shared routing patterns that allows |
||||
them to aggregate to a parent API Server in a seamless manner. Since we desire to break Grafana monolith up into |
||||
more functionally divided microservices, aggregation does the job of still being able to provide these services |
||||
under a single address. Other benefits of aggregation include free health checks and being able to independently |
||||
roll out features for each service without downtime. |
||||
|
||||
To read more about the concept, see |
||||
[here](https://kubernetes.io/docs/tasks/extend-kubernetes/setup-extension-api-server/). |
||||
|
||||
Note that this aggregation will be a totally internal detail to Grafana. External fully functional API Servers that |
||||
may themselves act as parent API Servers to Grafana will never be made aware of internal Grafana API Servers. |
||||
Thus, any `APIService` objects corresponding to Grafana's API groups will take the address of |
||||
Grafana's main API Server (the one that bundles grafana-aggregator). |
||||
|
||||
Also, note that the single binary OSS offering of Grafana doesn't make use of the aggregator component at all, instead |
||||
opting for local installation of all the Grafana API groups. |
||||
|
||||
### kube-aggregator versus grafana-aggregator |
||||
|
||||
The `grafana-aggregator` component will work similarly to how `kube-aggregator` works for `kube-apiserver`, the major |
||||
difference being that it doesn't require core V1 APIs such as `Service`. Early on, we decided to not have core V1 |
||||
APIs in the root Grafana API Server. In order to still be able to implement aggregation, we do the following in this Go |
||||
package: |
||||
|
||||
1. We do not start the core shared informer factories as well as any default controllers that utilize them. |
||||
This is achieved using `DisabledPostStartHooks` facility under the GenericAPIServer's RecommendedConfig. |
||||
2. We provide an `externalname` Kind API implementation under `service.grafana.app` group which works functionally |
||||
equivalent to the idea with the same name under `core/v1/Service`. |
||||
3. Lastly, we swap the default available condition controller with the custom one written by us. This one is based on |
||||
our `externalname` (`service.grafana.app`) implementation. We register separate `PostStartHooks` |
||||
using `AddPostStartHookOrDie` on the GenericAPIServer to start the corresponding custom controller as well as |
||||
requisite informer factories for our own `externalname` Kind. |
||||
4. For now, we bundle apiextensions-apiserver under our aggregator component. This is slightly different from K8s |
||||
where kube-apiserver is called the top-level component and controlplane, aggregator and apiextensions-apiserver |
||||
live under that instead. |
||||
|
||||
### Gotchas (Pay Attention) |
||||
|
||||
1. `grafana-aggregator` uses file storage under `data/grafana-apiserver` (`apiregistration.k8s.io`, |
||||
`service.grafana.app`). Thus, any restarts will still have any prior configured aggregation in effect. |
||||
2. During local development, ensure you start the aggregated service after launching the aggregator. This is |
||||
so you have TLS and kubeconfig available for use with example aggregated api servers. |
||||
3. Ensure you have `grafanaAPIServerWithExperimentalAPIs = false` in your custom.ini. Otherwise, the example |
||||
service the following guide uses for the aggregation test is bundled as a `Local` `APIService` and will cause |
||||
configuration overwrites on startup. |
||||
|
||||
## Testing aggregation locally |
||||
|
||||
1. Generate the PKI using `openssl` (for development purposes, we will use the CN of `system:masters`): |
||||
```shell |
||||
./hack/make-aggregator-pki.sh |
||||
``` |
||||
2. Configure the aggregator: |
||||
```ini |
||||
[feature_toggles] |
||||
grafanaAPIServerEnsureKubectlAccess = true |
||||
; disable the experimental APIs flag to disable bundling of the example service locally |
||||
grafanaAPIServerWithExperimentalAPIs = false |
||||
kubernetesAggregator = true |
||||
|
||||
[grafana-apiserver] |
||||
proxy_client_cert_file = ./data/grafana-aggregator/client.crt |
||||
proxy_client_key_file = ./data/grafana-aggregator/client.key |
||||
``` |
||||
3. Start the server |
||||
```shell |
||||
make run |
||||
``` |
||||
4. In another tab, apply the manifests: |
||||
```shell |
||||
export KUBECONFIG=$PWD/data/grafana-apiserver/grafana.kubeconfig |
||||
kubectl apply -f ./pkg/services/apiserver/aggregator/examples/manual-test/ |
||||
# SAMPLE OUTPUT |
||||
# apiservice.apiregistration.k8s.io/v0alpha1.example.grafana.app created |
||||
# externalname.service.grafana.app/example-apiserver created |
||||
|
||||
kubectl get apiservice |
||||
# SAMPLE OUTPUT |
||||
# NAME SERVICE AVAILABLE AGE |
||||
# v0alpha1.example.grafana.app grafana/example-apiserver False (FailedDiscoveryCheck) 29m |
||||
``` |
||||
5. In another tab, start the example microservice that will be aggregated by the parent apiserver: |
||||
```shell |
||||
go run ./pkg/cmd/grafana apiserver \ |
||||
--runtime-config=example.grafana.app/v0alpha1=true \ |
||||
--secure-port 7443 \ |
||||
--tls-cert-file $PWD/data/grafana-aggregator/server.crt \ |
||||
--tls-private-key-file $PWD/data/grafana-aggregator/server.key \ |
||||
--requestheader-client-ca-file=$PWD/data/grafana-aggregator/ca.crt \ |
||||
--requestheader-extra-headers-prefix=X-Remote-Extra- \ |
||||
--requestheader-group-headers=X-Remote-Group \ |
||||
--requestheader-username-headers=X-Remote-User \ |
||||
-v 10 |
||||
``` |
||||
6. After 10 seconds, check `APIService` again. It should report as available. |
||||
```shell |
||||
export KUBECONFIG=$PWD/data/grafana-apiserver/grafana.kubeconfig |
||||
kubectl get apiservice |
||||
# SAMPLE OUTPUT |
||||
# NAME SERVICE AVAILABLE AGE |
||||
# v0alpha1.example.grafana.app grafana/example-apiserver True 30m |
||||
``` |
||||
7. For tear down of the above test: |
||||
```shell |
||||
kubectl delete -f ./pkg/services/apiserver/aggregator/examples/ |
||||
``` |
||||
|
||||
## Testing auto-registration of remote services locally |
||||
|
||||
A sample aggregation config for remote services is provided under [conf](../../../../conf/aggregation/apiservices.yaml). Provided, you have the following setup in your custom.ini, the apiserver will |
||||
register your remotely running services on startup. |
||||
|
||||
```ini |
||||
; in custom.ini |
||||
; the bundle is only used when not in dev mode |
||||
apiservice_ca_bundle_file = ./data/grafana-aggregator/ca.crt |
||||
|
||||
remote_services_file = ./pkg/services/apiserver/aggregator/examples/autoregister/apiservices.yaml |
||||
``` |
@ -1,509 +0,0 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/cmd/kube-apiserver/app/aggregator.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/cmd/kube-apiserver/app/server.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/pkg/controlplane/apiserver/apiextensions.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package aggregator |
||||
|
||||
import ( |
||||
"crypto/tls" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"os" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"gopkg.in/yaml.v3" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
utilnet "k8s.io/apimachinery/pkg/util/net" |
||||
"k8s.io/apimachinery/pkg/util/sets" |
||||
genericapiserver "k8s.io/apiserver/pkg/server" |
||||
"k8s.io/apiserver/pkg/server/dynamiccertificates" |
||||
"k8s.io/apiserver/pkg/server/healthz" |
||||
"k8s.io/client-go/informers" |
||||
"k8s.io/client-go/kubernetes/fake" |
||||
"k8s.io/client-go/tools/cache" |
||||
"k8s.io/component-base/metrics" |
||||
"k8s.io/component-base/metrics/legacyregistry" |
||||
"k8s.io/klog/v2" |
||||
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" |
||||
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" |
||||
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" |
||||
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" |
||||
apiregistrationclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" |
||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" |
||||
apiregistrationInformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" |
||||
"k8s.io/kube-aggregator/pkg/controllers" |
||||
"k8s.io/kube-aggregator/pkg/controllers/autoregister" |
||||
|
||||
servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1" |
||||
servicev0alpha1applyconfiguration "github.com/grafana/grafana/pkg/generated/applyconfiguration/service/v0alpha1" |
||||
serviceclientset "github.com/grafana/grafana/pkg/generated/clientset/versioned" |
||||
informersv0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions" |
||||
"github.com/grafana/grafana/pkg/registry/apis/service" |
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder" |
||||
"github.com/grafana/grafana/pkg/services/apiserver/options" |
||||
) |
||||
|
||||
// making sure we only register metrics once into legacy registry
|
||||
var registerIntoLegacyRegistryOnce sync.Once |
||||
|
||||
//nolint:unused
|
||||
func _readCABundlePEM(path string, devMode bool) ([]byte, error) { |
||||
if devMode { |
||||
return nil, nil |
||||
} |
||||
|
||||
// We can ignore the gosec G304 warning on this one because `path` comes
|
||||
// from Grafana configuration (commandOptions.AggregatorOptions.APIServiceCABundleFile)
|
||||
//nolint:gosec
|
||||
f, err := os.Open(path) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer func() { |
||||
if err := f.Close(); err != nil { |
||||
klog.Errorf("error closing remote services file: %s", err) |
||||
} |
||||
}() |
||||
|
||||
return io.ReadAll(f) |
||||
} |
||||
|
||||
func ReadRemoteServices(path string) ([]RemoteService, error) { |
||||
// We can ignore the gosec G304 warning on this one because `path` comes
|
||||
// from Grafana configuration (commandOptions.AggregatorOptions.RemoteServicesFile)
|
||||
//nolint:gosec
|
||||
f, err := os.Open(path) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer func() { |
||||
if err := f.Close(); err != nil { |
||||
klog.Errorf("error closing remote services file: %s", err) |
||||
} |
||||
}() |
||||
|
||||
rawRemoteServices, err := io.ReadAll(f) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
remoteServices := make([]RemoteService, 0) |
||||
if err := yaml.Unmarshal(rawRemoteServices, &remoteServices); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return remoteServices, nil |
||||
} |
||||
|
||||
func CreateAggregatorConfig(commandOptions *options.Options, sharedConfig genericapiserver.RecommendedConfig, externalNamesNamespace string) (*Config, error) { |
||||
// Create a fake clientset and informers for the k8s v1 API group.
|
||||
// These are not used in grafana's aggregator because v1 APIs are not available.
|
||||
fakev1Informers := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute) |
||||
|
||||
serviceClient, err := serviceclientset.NewForConfig(sharedConfig.LoopbackClientConfig) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
sharedInformerFactory := informersv0alpha1.NewSharedInformerFactory( |
||||
serviceClient, |
||||
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
|
||||
) |
||||
serviceResolver := NewExternalNameResolver(sharedInformerFactory.Service().V0alpha1().ExternalNames().Lister()) |
||||
|
||||
aggregatorConfig := &aggregatorapiserver.Config{ |
||||
GenericConfig: &genericapiserver.RecommendedConfig{ |
||||
Config: sharedConfig.Config, |
||||
SharedInformerFactory: fakev1Informers, |
||||
ClientConfig: sharedConfig.LoopbackClientConfig, |
||||
}, |
||||
ExtraConfig: aggregatorapiserver.ExtraConfig{ |
||||
DisableRemoteAvailableConditionController: true, |
||||
// NOTE: while ProxyTransport can be skipped in the configuration, it allows honoring
|
||||
// DISABLE_HTTP2, HTTPS_PROXY and NO_PROXY env vars as needed
|
||||
ProxyTransport: createProxyTransport(), |
||||
ServiceResolver: serviceResolver, |
||||
}, |
||||
} |
||||
|
||||
if commandOptions.KubeAggregatorOptions.LegacyClientCertAuth { |
||||
// NOTE: the availability controller below is a bit different and uses the cert/key pair regardless
|
||||
// of the legacy bool, this is because we are still using that for discovery requests
|
||||
aggregatorConfig.ExtraConfig.ProxyClientCertFile = commandOptions.KubeAggregatorOptions.ProxyClientCertFile |
||||
aggregatorConfig.ExtraConfig.ProxyClientKeyFile = commandOptions.KubeAggregatorOptions.ProxyClientKeyFile |
||||
} |
||||
|
||||
customExtraConfig := &CustomExtraConfig{ |
||||
DiscoveryOnlyProxyClientCertFile: commandOptions.KubeAggregatorOptions.ProxyClientCertFile, |
||||
DiscoveryOnlyProxyClientKeyFile: commandOptions.KubeAggregatorOptions.ProxyClientKeyFile, |
||||
} |
||||
|
||||
if err := commandOptions.KubeAggregatorOptions.ApplyTo(aggregatorConfig, commandOptions.RecommendedOptions.Etcd); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
serviceAPIBuilder := service.NewServiceAPIBuilder() |
||||
if err := serviceAPIBuilder.InstallSchema(aggregatorscheme.Scheme); err != nil { |
||||
return nil, err |
||||
} |
||||
APIVersionPriorities[serviceAPIBuilder.GetGroupVersion()] = Priority{Group: 15000, Version: int32(1)} |
||||
|
||||
// Exit early, if no remote services file is configured
|
||||
if commandOptions.KubeAggregatorOptions.RemoteServicesFile == "" { |
||||
return NewConfig(aggregatorConfig, customExtraConfig, sharedInformerFactory, []builder.APIGroupBuilder{serviceAPIBuilder}, nil), nil |
||||
} |
||||
|
||||
remoteServices, err := ReadRemoteServices(commandOptions.KubeAggregatorOptions.RemoteServicesFile) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
remoteServicesConfig := &RemoteServicesConfig{ |
||||
// TODO: in practice, we should only use the insecure flag when commandOptions.ExtraOptions.DevMode == true
|
||||
// But given the bug in K8s, we are forced to set it to true until the below PR is merged and available
|
||||
// https://github.com/kubernetes/kubernetes/pull/123808
|
||||
InsecureSkipTLSVerify: true, |
||||
ExternalNamesNamespace: externalNamesNamespace, |
||||
// TODO: CABundle can't be set when insecure is true
|
||||
// CABundle: caBundlePEM,
|
||||
Services: remoteServices, |
||||
serviceClientSet: serviceClient, |
||||
} |
||||
|
||||
return NewConfig(aggregatorConfig, customExtraConfig, sharedInformerFactory, []builder.APIGroupBuilder{serviceAPIBuilder}, remoteServicesConfig), nil |
||||
} |
||||
|
||||
// CreateAggregatorServer creates an aggregated server to layer into the existing apiserver
|
||||
// TODO: passing options temporarily as that allows us to pass in cert/key for client into AvailableController but skip it in the aggregator lib
|
||||
func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.DelegationTarget, reg prometheus.Registerer) (*aggregatorapiserver.APIAggregator, error) { |
||||
aggregatorConfig := config.KubeAggregatorConfig |
||||
sharedInformerFactory := config.Informers |
||||
remoteServicesConfig := config.RemoteServicesConfig |
||||
externalNamesInformer := sharedInformerFactory.Service().V0alpha1().ExternalNames() |
||||
completedConfig := aggregatorConfig.Complete() |
||||
|
||||
aggregatorServer, err := completedConfig.NewWithDelegate(delegateAPIServer) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// create controllers for auto-registration
|
||||
apiRegistrationClient, err := apiregistrationclient.NewForConfig(completedConfig.GenericConfig.LoopbackClientConfig) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient) |
||||
|
||||
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController) |
||||
|
||||
// Imbue all builtin group-priorities onto the aggregated discovery
|
||||
if completedConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil { |
||||
for gv, entry := range APIVersionPriorities { |
||||
completedConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.Group), int(entry.Version)) |
||||
} |
||||
} |
||||
|
||||
err = aggregatorServer.GenericAPIServer.AddPostStartHook("grafana-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error { |
||||
go autoRegistrationController.Run(5, context.Done()) |
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if remoteServicesConfig != nil { |
||||
addRemoteAPIServicesToRegister(remoteServicesConfig, autoRegistrationController) |
||||
externalNames := getRemoteExternalNamesToRegister(remoteServicesConfig) |
||||
err = aggregatorServer.GenericAPIServer.AddPostStartHook("grafana-apiserver-remote-autoregistration", func(ctx genericapiserver.PostStartHookContext) error { |
||||
controllers.WaitForCacheSync("grafana-apiserver-remote-autoregistration", ctx.Done(), externalNamesInformer.Informer().HasSynced) |
||||
namespacedClient := remoteServicesConfig.serviceClientSet.ServiceV0alpha1().ExternalNames(remoteServicesConfig.ExternalNamesNamespace) |
||||
for _, externalName := range externalNames { |
||||
_, err := namespacedClient.Apply(ctx, externalName, metav1.ApplyOptions{ |
||||
FieldManager: "grafana-aggregator", |
||||
Force: true, |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks( |
||||
makeAPIServiceAvailableHealthCheck( |
||||
"autoregister-completion", |
||||
apiServices, |
||||
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), |
||||
), |
||||
) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
apiregistrationClient, err := apiregistrationclientset.NewForConfig(completedConfig.GenericConfig.LoopbackClientConfig) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
proxyCurrentCertKeyContentFunc := func() ([]byte, []byte) { |
||||
return nil, nil |
||||
} |
||||
if len(config.CustomExtraConfig.DiscoveryOnlyProxyClientCertFile) > 0 && len(config.CustomExtraConfig.DiscoveryOnlyProxyClientKeyFile) > 0 { |
||||
aggregatorProxyCerts, err := dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert", config.CustomExtraConfig.DiscoveryOnlyProxyClientCertFile, config.CustomExtraConfig.DiscoveryOnlyProxyClientKeyFile) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
proxyCurrentCertKeyContentFunc = func() ([]byte, []byte) { |
||||
return aggregatorProxyCerts.CurrentCertKeyContent() |
||||
} |
||||
} |
||||
|
||||
registry := legacyregistry.DefaultGatherer.(metrics.KubeRegistry) |
||||
availibilityMetrics := newAvailabilityMetrics() |
||||
// create shared (remote and local) availability metrics
|
||||
// TODO: decouple from legacyregistry
|
||||
registerIntoLegacyRegistryOnce.Do(func() { err = availibilityMetrics.Register(registry.Register, registry.CustomRegister) }) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
availableController, err := NewAvailableConditionController( |
||||
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), |
||||
externalNamesInformer, |
||||
apiregistrationClient.ApiregistrationV1(), |
||||
nil, |
||||
proxyCurrentCertKeyContentFunc, |
||||
completedConfig.ExtraConfig.ServiceResolver, |
||||
availibilityMetrics, |
||||
) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
aggregatorServer.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-override-available-controller", func(context genericapiserver.PostStartHookContext) error { |
||||
// if we end up blocking for long periods of time, we may need to increase workers.
|
||||
go availableController.Run(5, context.Done()) |
||||
return nil |
||||
}) |
||||
|
||||
aggregatorServer.GenericAPIServer.AddPostStartHookOrDie("start-grafana-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { |
||||
sharedInformerFactory.Start(context.Done()) |
||||
aggregatorServer.APIRegistrationInformers.Start(context.Done()) |
||||
return nil |
||||
}) |
||||
|
||||
serviceAPIGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(servicev0alpha1.GROUP, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs) |
||||
for _, b := range config.Builders { |
||||
err := b.UpdateAPIGroupInfo( |
||||
&serviceAPIGroupInfo, |
||||
builder.APIGroupOptions{ |
||||
Scheme: aggregatorscheme.Scheme, |
||||
OptsGetter: aggregatorConfig.GenericConfig.RESTOptionsGetter, |
||||
DualWriteBuilder: nil, // no dual writer
|
||||
MetricsRegister: reg, |
||||
}, |
||||
) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
if err := aggregatorServer.GenericAPIServer.InstallAPIGroup(&serviceAPIGroupInfo); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return aggregatorServer, nil |
||||
} |
||||
|
||||
func makeAPIService(gv schema.GroupVersion) *v1.APIService { |
||||
apiServicePriority, ok := APIVersionPriorities[gv] |
||||
if !ok { |
||||
// if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
|
||||
// being permanently stuck in the APIServices list.
|
||||
klog.Infof("Skipping APIService creation for %v", gv) |
||||
return nil |
||||
} |
||||
return &v1.APIService{ |
||||
ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group}, |
||||
Spec: v1.APIServiceSpec{ |
||||
Group: gv.Group, |
||||
Version: gv.Version, |
||||
GroupPriorityMinimum: apiServicePriority.Group, |
||||
VersionPriority: apiServicePriority.Version, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
// makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy
|
||||
// once all of the specified services have been observed to be available at least once.
|
||||
func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer apiregistrationInformers.APIServiceInformer) healthz.HealthChecker { |
||||
// Track the auto-registered API services that have not been observed to be available yet
|
||||
pendingServiceNamesLock := &sync.RWMutex{} |
||||
pendingServiceNames := sets.NewString() |
||||
for _, service := range apiServices { |
||||
pendingServiceNames.Insert(service.Name) |
||||
} |
||||
|
||||
// When an APIService in the list is seen as available, remove it from the pending list
|
||||
handleAPIServiceChange := func(service *v1.APIService) { |
||||
pendingServiceNamesLock.Lock() |
||||
defer pendingServiceNamesLock.Unlock() |
||||
if !pendingServiceNames.Has(service.Name) { |
||||
return |
||||
} |
||||
if v1helper.IsAPIServiceConditionTrue(service, v1.Available) { |
||||
pendingServiceNames.Delete(service.Name) |
||||
} |
||||
} |
||||
|
||||
// Watch add/update events for APIServices
|
||||
_, err := apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
||||
AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) }, |
||||
UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) }, |
||||
}) |
||||
if err != nil { |
||||
klog.Errorf("Failed to watch APIServices for health check: %v", err) |
||||
} |
||||
|
||||
// Don't return healthy until the pending list is empty
|
||||
return healthz.NamedCheck(name, func(r *http.Request) error { |
||||
pendingServiceNamesLock.RLock() |
||||
defer pendingServiceNamesLock.RUnlock() |
||||
if pendingServiceNames.Len() > 0 { |
||||
klog.Error("APIServices not yet available", "services", pendingServiceNames.List()) |
||||
return fmt.Errorf("missing APIService: %v", pendingServiceNames.List()) |
||||
} |
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
// Priority defines group Priority that is used in discovery. This controls
|
||||
// group position in the kubectl output.
|
||||
type Priority struct { |
||||
// Group indicates the order of the Group relative to other groups.
|
||||
Group int32 |
||||
// Version indicates the relative order of the Version inside of its group.
|
||||
Version int32 |
||||
} |
||||
|
||||
// APIVersionPriorities are the proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers
|
||||
// is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed.
|
||||
// This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated.
|
||||
// That ripples out every bit as far as you'd expect, so for 1.7 we'll include the list here instead of being built up during storage.
|
||||
var APIVersionPriorities = map[schema.GroupVersion]Priority{ |
||||
{Group: "", Version: "v1"}: {Group: 18000, Version: 1}, |
||||
// to my knowledge, nothing below here collides
|
||||
{Group: "admissionregistration.k8s.io", Version: "v1"}: {Group: 16700, Version: 15}, |
||||
{Group: "admissionregistration.k8s.io", Version: "v1beta1"}: {Group: 16700, Version: 12}, |
||||
{Group: "admissionregistration.k8s.io", Version: "v1alpha1"}: {Group: 16700, Version: 9}, |
||||
// Append a new group to the end of the list if unsure.
|
||||
// You can use min(existing group)-100 as the initial value for a group.
|
||||
// Version can be set to 9 (to have space around) for a new group.
|
||||
} |
||||
|
||||
func addRemoteAPIServicesToRegister(config *RemoteServicesConfig, registration autoregister.AutoAPIServiceRegistration) { |
||||
for i, service := range config.Services { |
||||
port := service.Port |
||||
apiService := &v1.APIService{ |
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Version + "." + service.Group}, |
||||
Spec: v1.APIServiceSpec{ |
||||
Group: service.Group, |
||||
Version: service.Version, |
||||
InsecureSkipTLSVerify: config.InsecureSkipTLSVerify, |
||||
CABundle: config.CABundle, |
||||
// TODO: Group priority minimum of 1000 more than for local services, figure out a better story
|
||||
// when we have multiple versions, potentially running in heterogeneous ways (local and remote)
|
||||
GroupPriorityMinimum: 16000, |
||||
VersionPriority: 1 + int32(i), |
||||
Service: &v1.ServiceReference{ |
||||
Name: service.Version + "." + service.Group, |
||||
Namespace: config.ExternalNamesNamespace, |
||||
Port: &port, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
registration.AddAPIServiceToSyncOnStart(apiService) |
||||
} |
||||
} |
||||
|
||||
func getRemoteExternalNamesToRegister(config *RemoteServicesConfig) []*servicev0alpha1applyconfiguration.ExternalNameApplyConfiguration { |
||||
externalNames := make([]*servicev0alpha1applyconfiguration.ExternalNameApplyConfiguration, 0) |
||||
|
||||
for _, service := range config.Services { |
||||
host := service.Host |
||||
name := service.Version + "." + service.Group |
||||
externalName := &servicev0alpha1applyconfiguration.ExternalNameApplyConfiguration{} |
||||
externalName.WithAPIVersion(servicev0alpha1.SchemeGroupVersion.String()) |
||||
externalName.WithKind("ExternalName") |
||||
externalName.WithName(name) |
||||
externalName.WithSpec(&servicev0alpha1applyconfiguration.ExternalNameSpecApplyConfiguration{ |
||||
Host: &host, |
||||
}) |
||||
externalNames = append(externalNames, externalName) |
||||
} |
||||
|
||||
return externalNames |
||||
} |
||||
|
||||
func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService { |
||||
apiServices := []*v1.APIService{} |
||||
|
||||
for _, curr := range delegateAPIServer.ListedPaths() { |
||||
if curr == "/api/v1" { |
||||
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"}) |
||||
registration.AddAPIServiceToSyncOnStart(apiService) |
||||
apiServices = append(apiServices, apiService) |
||||
continue |
||||
} |
||||
|
||||
if !strings.HasPrefix(curr, "/apis/") { |
||||
continue |
||||
} |
||||
// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
|
||||
tokens := strings.Split(curr, "/") |
||||
if len(tokens) != 4 { |
||||
continue |
||||
} |
||||
|
||||
apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]}) |
||||
if apiService == nil { |
||||
continue |
||||
} |
||||
registration.AddAPIServiceToSyncOnStart(apiService) |
||||
apiServices = append(apiServices, apiService) |
||||
} |
||||
|
||||
return apiServices |
||||
} |
||||
|
||||
// NOTE: below function imported from https://github.com/kubernetes/kubernetes/blob/master/cmd/kube-apiserver/app/server.go#L197
|
||||
// createProxyTransport creates the dialer infrastructure to connect to the api servers.
|
||||
func createProxyTransport() *http.Transport { |
||||
// NOTE: We don't set proxyDialerFn but the below SetTransportDefaults will
|
||||
// See https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apimachinery/pkg/util/net/http.go#L109
|
||||
var proxyDialerFn utilnet.DialFunc |
||||
// Proxying to services is IP-based... don't expect to be able to verify the hostname
|
||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true} |
||||
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{ |
||||
DialContext: proxyDialerFn, |
||||
TLSClientConfig: proxyTLSClientConfig, |
||||
}) |
||||
return proxyTransport |
||||
} |
@ -1,71 +0,0 @@ |
||||
package aggregator_test |
||||
|
||||
import ( |
||||
"sort" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" |
||||
genericapiserver "k8s.io/apiserver/pkg/server" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend" |
||||
"k8s.io/client-go/informers" |
||||
"k8s.io/client-go/kubernetes/fake" |
||||
clientrest "k8s.io/client-go/rest" |
||||
utilversion "k8s.io/component-base/version" |
||||
"k8s.io/kube-aggregator/pkg/apiserver" |
||||
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" |
||||
aggregatoropenapi "k8s.io/kube-aggregator/pkg/generated/openapi" |
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore" |
||||
) |
||||
|
||||
// TestAggregatorPostStartHooks tests that the kube-aggregator server has the expected default post start hooks enabled.
|
||||
func TestAggregatorPostStartHooks(t *testing.T) { |
||||
cfg := apiserver.Config{ |
||||
GenericConfig: genericapiserver.NewRecommendedConfig(aggregatorscheme.Codecs), |
||||
ExtraConfig: apiserver.ExtraConfig{}, |
||||
} |
||||
|
||||
cfg.GenericConfig.ExternalAddress = "127.0.0.1:6443" |
||||
cfg.GenericConfig.EffectiveVersion = utilversion.DefaultBuildEffectiveVersion() |
||||
cfg.GenericConfig.LoopbackClientConfig = &clientrest.Config{} |
||||
cfg.GenericConfig.MergedResourceConfig = apiserver.DefaultAPIResourceConfigSource() |
||||
|
||||
// Add OpenAPI config, which depends on builders
|
||||
namer := openapinamer.NewDefinitionNamer(aggregatorscheme.Scheme) |
||||
cfg.GenericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(aggregatoropenapi.GetOpenAPIDefinitions, namer) |
||||
cfg.GenericConfig.OpenAPIV3Config.Info.Title = "Kubernetes" |
||||
cfg.GenericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(aggregatoropenapi.GetOpenAPIDefinitions, namer) |
||||
cfg.GenericConfig.OpenAPIConfig.Info.Title = "Kubernetes" |
||||
cfg.GenericConfig.SkipOpenAPIInstallation = true |
||||
cfg.GenericConfig.SharedInformerFactory = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 10*time.Minute) |
||||
|
||||
// override the RESTOptionsGetter to use the in memory storage options
|
||||
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(*storagebackend.NewDefaultConfig("memory", nil)) |
||||
require.NoError(t, err) |
||||
cfg.GenericConfig.RESTOptionsGetter = restOptionsGetter |
||||
|
||||
complete := cfg.Complete() |
||||
|
||||
server, err := complete.NewWithDelegate(genericapiserver.NewEmptyDelegate()) |
||||
require.NoError(t, err) |
||||
|
||||
actual := make([]string, 0, len(server.GenericAPIServer.PostStartHooks())) |
||||
for k := range server.GenericAPIServer.PostStartHooks() { |
||||
actual = append(actual, k) |
||||
} |
||||
sort.Strings(actual) |
||||
expected := []string{ |
||||
"apiservice-discovery-controller", |
||||
"generic-apiserver-start-informers", |
||||
"max-in-flight-filter", |
||||
"storage-object-count-tracker-hook", |
||||
"start-kube-aggregator-informers", |
||||
"apiservice-status-local-available-controller", |
||||
"apiservice-status-remote-available-controller", |
||||
"apiservice-registration-controller", |
||||
} |
||||
sort.Strings(expected) |
||||
require.Equal(t, expected, actual) |
||||
} |
@ -1,490 +0,0 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kube-aggregator/blob/master/pkg/controllers/status/available_controller.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package aggregator |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"net/http" |
||||
"net/url" |
||||
"reflect" |
||||
"sync" |
||||
"time" |
||||
|
||||
"k8s.io/apimachinery/pkg/api/equality" |
||||
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||
"k8s.io/apimachinery/pkg/api/meta" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/labels" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
||||
"k8s.io/apimachinery/pkg/util/wait" |
||||
"k8s.io/client-go/tools/cache" |
||||
"k8s.io/client-go/transport" |
||||
"k8s.io/client-go/util/workqueue" |
||||
"k8s.io/klog/v2" |
||||
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" |
||||
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" |
||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" |
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" |
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" |
||||
"k8s.io/kube-aggregator/pkg/controllers" |
||||
|
||||
"github.com/grafana/grafana/pkg/apis/service/v0alpha1" |
||||
informersservicev0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions/service/v0alpha1" |
||||
listersservicev0alpha1 "github.com/grafana/grafana/pkg/generated/listers/service/v0alpha1" |
||||
) |
||||
|
||||
type certKeyFunc func() ([]byte, []byte) |
||||
|
||||
// ServiceResolver knows how to convert a service reference into an actual location.
|
||||
type ServiceResolver interface { |
||||
ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) |
||||
} |
||||
|
||||
// AvailableConditionController handles checking the availability of registered API services.
|
||||
type AvailableConditionController struct { |
||||
apiServiceClient apiregistrationclient.APIServicesGetter |
||||
|
||||
apiServiceLister listers.APIServiceLister |
||||
apiServiceSynced cache.InformerSynced |
||||
|
||||
// externalNameLister is used to get the IP to create the transport for
|
||||
externalNameLister listersservicev0alpha1.ExternalNameLister |
||||
servicesSynced cache.InformerSynced |
||||
|
||||
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
|
||||
proxyTransportDial *transport.DialHolder |
||||
proxyCurrentCertKeyContent certKeyFunc |
||||
serviceResolver ServiceResolver |
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error |
||||
|
||||
queue workqueue.TypedRateLimitingInterface[string] |
||||
// map from service-namespace -> service-name -> apiservice names
|
||||
cache map[string]map[string][]string |
||||
// this lock protects operations on the above cache
|
||||
cacheLock sync.RWMutex |
||||
metrics *Metrics |
||||
} |
||||
|
||||
// NewAvailableConditionController returns a new AvailableConditionController.
|
||||
func NewAvailableConditionController( |
||||
apiServiceInformer informers.APIServiceInformer, |
||||
externalNameInformer informersservicev0alpha1.ExternalNameInformer, |
||||
apiServiceClient apiregistrationclient.APIServicesGetter, |
||||
proxyTransportDial *transport.DialHolder, |
||||
proxyCurrentCertKeyContent certKeyFunc, |
||||
serviceResolver ServiceResolver, |
||||
metrics *Metrics, |
||||
) (*AvailableConditionController, error) { |
||||
c := &AvailableConditionController{ |
||||
apiServiceClient: apiServiceClient, |
||||
apiServiceLister: apiServiceInformer.Lister(), |
||||
externalNameLister: externalNameInformer.Lister(), |
||||
serviceResolver: serviceResolver, |
||||
queue: workqueue.NewTypedRateLimitingQueueWithConfig( |
||||
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
|
||||
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
|
||||
// the maximum disruption time to a minimum, but it does prevent hot loops.
|
||||
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second), |
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "AvailableConditionController"}, |
||||
), |
||||
proxyTransportDial: proxyTransportDial, |
||||
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent, |
||||
metrics: metrics, |
||||
} |
||||
|
||||
// resync on this one because it is low cardinality and rechecking the actual discovery
|
||||
// allows us to detect health in a more timely fashion when network connectivity to
|
||||
// nodes is snipped, but the network still attempts to route there. See
|
||||
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
|
||||
apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod( |
||||
cache.ResourceEventHandlerFuncs{ |
||||
AddFunc: c.addAPIService, |
||||
UpdateFunc: c.updateAPIService, |
||||
DeleteFunc: c.deleteAPIService, |
||||
}, |
||||
30*time.Second) |
||||
c.apiServiceSynced = apiServiceHandler.HasSynced |
||||
|
||||
serviceHandler, _ := externalNameInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
||||
AddFunc: c.addService, |
||||
UpdateFunc: c.updateService, |
||||
DeleteFunc: c.deleteService, |
||||
}) |
||||
c.servicesSynced = serviceHandler.HasSynced |
||||
|
||||
c.syncFn = c.sync |
||||
|
||||
return c, nil |
||||
} |
||||
|
||||
func (c *AvailableConditionController) sync(key string) error { |
||||
originalAPIService, err := c.apiServiceLister.Get(key) |
||||
if apierrors.IsNotFound(err) { |
||||
if originalAPIService.Spec.Service != nil { |
||||
// Only reset state, if the service was a remote service
|
||||
c.metrics.ForgetAPIService(key) |
||||
} |
||||
return nil |
||||
} |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// if a particular transport was specified, use that otherwise build one
|
||||
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
|
||||
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
|
||||
transportConfig := &transport.Config{ |
||||
TLS: transport.TLSConfig{ |
||||
Insecure: true, |
||||
}, |
||||
DialHolder: c.proxyTransportDial, |
||||
} |
||||
|
||||
if c.proxyCurrentCertKeyContent != nil { |
||||
proxyClientCert, proxyClientKey := c.proxyCurrentCertKeyContent() |
||||
|
||||
transportConfig.TLS.CertData = proxyClientCert |
||||
transportConfig.TLS.KeyData = proxyClientKey |
||||
} |
||||
restTransport, err := transport.New(transportConfig) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
discoveryClient := &http.Client{ |
||||
Transport: restTransport, |
||||
// the request should happen quickly.
|
||||
Timeout: 5 * time.Second, |
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error { |
||||
return http.ErrUseLastResponse |
||||
}, |
||||
} |
||||
|
||||
apiService := originalAPIService.DeepCopy() |
||||
|
||||
availableCondition := apiregistrationv1.APIServiceCondition{ |
||||
Type: apiregistrationv1.Available, |
||||
Status: apiregistrationv1.ConditionTrue, |
||||
LastTransitionTime: metav1.Now(), |
||||
} |
||||
|
||||
// local API services are always considered available
|
||||
if apiService.Spec.Service == nil { |
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition()) |
||||
_, err := c.updateAPIServiceStatus(originalAPIService, apiService) |
||||
return err |
||||
} |
||||
|
||||
_, err = c.externalNameLister.ExternalNames(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name) |
||||
if apierrors.IsNotFound(err) { |
||||
availableCondition.Status = apiregistrationv1.ConditionFalse |
||||
availableCondition.Reason = "ServiceNotFound" |
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace) |
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) |
||||
_, err := c.updateAPIServiceStatus(originalAPIService, apiService) |
||||
return err |
||||
} else if err != nil { |
||||
availableCondition.Status = apiregistrationv1.ConditionUnknown |
||||
availableCondition.Reason = "ServiceAccessError" |
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err) |
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) |
||||
_, err := c.updateAPIServiceStatus(originalAPIService, apiService) |
||||
return err |
||||
} |
||||
|
||||
// actually try to hit the discovery endpoint when it isn't local and when we're routing as a service.
|
||||
if apiService.Spec.Service != nil && c.serviceResolver != nil { |
||||
attempts := 5 |
||||
results := make(chan error, attempts) |
||||
for i := 0; i < attempts; i++ { |
||||
go func() { |
||||
// stagger these requests to reduce pressure on aggregated services
|
||||
waitDuration := time.Second * time.Duration(int32(i)) |
||||
time.Sleep(waitDuration) |
||||
|
||||
discoveryURL, err := c.serviceResolver.ResolveEndpoint(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, *apiService.Spec.Service.Port) |
||||
if err != nil { |
||||
results <- err |
||||
return |
||||
} |
||||
// render legacyAPIService health check path when it is delegated to a service
|
||||
if apiService.Name == "v1." { |
||||
discoveryURL.Path = "/api/" + apiService.Spec.Version |
||||
} else { |
||||
discoveryURL.Path = "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version |
||||
} |
||||
|
||||
errCh := make(chan error, 1) |
||||
go func() { |
||||
// be sure to check a URL that the aggregated API server is required to serve
|
||||
newReq, err := http.NewRequest("GET", discoveryURL.String(), nil) |
||||
if err != nil { |
||||
errCh <- err |
||||
return |
||||
} |
||||
|
||||
// setting the system-masters identity ensures that we will always have access rights
|
||||
uid := "" |
||||
var extra map[string][]string |
||||
transport.SetAuthProxyHeaders(newReq, "system:kube-aggregator", uid, []string{"system:masters"}, extra) |
||||
resp, err := discoveryClient.Do(newReq) |
||||
if resp != nil { |
||||
_ = resp.Body.Close() |
||||
// we should always been in the 200s or 300s
|
||||
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { |
||||
errCh <- fmt.Errorf("bad status from %v: %d", discoveryURL, resp.StatusCode) |
||||
return |
||||
} |
||||
} |
||||
|
||||
errCh <- err |
||||
}() |
||||
|
||||
select { |
||||
case err = <-errCh: |
||||
if err != nil { |
||||
results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err) |
||||
return |
||||
} |
||||
|
||||
// we had trouble with slow dial and DNS responses causing us to wait too long.
|
||||
// we added this as insurance
|
||||
case <-time.After(6 * time.Second): |
||||
results <- fmt.Errorf("timed out waiting for %v", discoveryURL) |
||||
return |
||||
} |
||||
|
||||
results <- nil |
||||
}() |
||||
} |
||||
|
||||
var lastError error |
||||
for i := 0; i < attempts; i++ { |
||||
lastError = <-results |
||||
// if we had at least one success, we are successful overall and we can return now
|
||||
if lastError == nil { |
||||
break |
||||
} |
||||
} |
||||
|
||||
if lastError != nil { |
||||
availableCondition.Status = apiregistrationv1.ConditionFalse |
||||
availableCondition.Reason = "FailedDiscoveryCheck" |
||||
availableCondition.Message = lastError.Error() |
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) |
||||
_, updateErr := c.updateAPIServiceStatus(originalAPIService, apiService) |
||||
if updateErr != nil { |
||||
return updateErr |
||||
} |
||||
// force a requeue to make it very obvious that this will be retried at some point in the future
|
||||
// along with other requeues done via service change, endpoint change, and resync
|
||||
return lastError |
||||
} |
||||
} |
||||
|
||||
availableCondition.Reason = "Passed" |
||||
availableCondition.Message = "all checks passed" |
||||
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition) |
||||
_, err = c.updateAPIServiceStatus(originalAPIService, apiService) |
||||
return err |
||||
} |
||||
|
||||
// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
|
||||
// apiservices. Doing that means we don't want to quickly issue no-op updates.
|
||||
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) { |
||||
// update this metric on every sync operation to reflect the actual state
|
||||
if newAPIService.Spec.Service != nil { |
||||
// Only expose the metric for remote services, trusts the type on the new object
|
||||
c.metrics.SetUnavailableGauge(newAPIService) |
||||
} |
||||
|
||||
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) { |
||||
return newAPIService, nil |
||||
} |
||||
|
||||
orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available) |
||||
now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available) |
||||
unknown := apiregistrationv1.APIServiceCondition{ |
||||
Type: apiregistrationv1.Available, |
||||
Status: apiregistrationv1.ConditionUnknown, |
||||
} |
||||
if orig == nil { |
||||
orig = &unknown |
||||
} |
||||
if now == nil { |
||||
now = &unknown |
||||
} |
||||
if *orig != *now { |
||||
klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason) |
||||
} |
||||
|
||||
newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if newAPIService.Spec.Service != nil { |
||||
// Only expose the metric for remote services, trusts the type on the new object
|
||||
c.metrics.SetUnavailableCounter(originalAPIService, newAPIService) |
||||
} |
||||
return newAPIService, nil |
||||
} |
||||
|
||||
// Run starts the AvailableConditionController loop which manages the availability condition of API services.
|
||||
func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) { |
||||
defer utilruntime.HandleCrash() |
||||
defer c.queue.ShutDown() |
||||
|
||||
klog.Info("Starting AvailableConditionController") |
||||
defer klog.Info("Shutting down AvailableConditionController") |
||||
|
||||
// This waits not just for the informers to sync, but for our handlers
|
||||
// to be called; since the handlers are three different ways of
|
||||
// enqueueing the same thing, waiting for this permits the queue to
|
||||
// maximally de-duplicate the entries.
|
||||
if !controllers.WaitForCacheSync("AvailableConditionCOverrideController", stopCh, c.apiServiceSynced, c.servicesSynced) { |
||||
return |
||||
} |
||||
|
||||
for i := 0; i < workers; i++ { |
||||
go wait.Until(c.runWorker, time.Second, stopCh) |
||||
} |
||||
|
||||
<-stopCh |
||||
} |
||||
|
||||
func (c *AvailableConditionController) runWorker() { |
||||
for c.processNextWorkItem() { |
||||
} |
||||
} |
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *AvailableConditionController) processNextWorkItem() bool { |
||||
key, quit := c.queue.Get() |
||||
if quit { |
||||
return false |
||||
} |
||||
defer c.queue.Done(key) |
||||
|
||||
err := c.syncFn(key) |
||||
if err == nil { |
||||
c.queue.Forget(key) |
||||
return true |
||||
} |
||||
|
||||
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err)) |
||||
c.queue.AddRateLimited(key) |
||||
|
||||
return true |
||||
} |
||||
|
||||
func (c *AvailableConditionController) addAPIService(obj interface{}) { |
||||
castObj := obj.(*apiregistrationv1.APIService) |
||||
klog.V(4).Infof("Adding %s", castObj.Name) |
||||
if castObj.Spec.Service != nil { |
||||
c.rebuildAPIServiceCache() |
||||
} |
||||
c.queue.Add(castObj.Name) |
||||
} |
||||
|
||||
func (c *AvailableConditionController) updateAPIService(oldObj, newObj interface{}) { |
||||
castObj := newObj.(*apiregistrationv1.APIService) |
||||
oldCastObj := oldObj.(*apiregistrationv1.APIService) |
||||
klog.V(4).Infof("Updating %s", oldCastObj.Name) |
||||
if !reflect.DeepEqual(castObj.Spec.Service, oldCastObj.Spec.Service) { |
||||
c.rebuildAPIServiceCache() |
||||
} |
||||
c.queue.Add(oldCastObj.Name) |
||||
} |
||||
|
||||
func (c *AvailableConditionController) deleteAPIService(obj interface{}) { |
||||
castObj, ok := obj.(*apiregistrationv1.APIService) |
||||
if !ok { |
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) |
||||
if !ok { |
||||
klog.Errorf("Couldn't get object from tombstone %#v", obj) |
||||
return |
||||
} |
||||
castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService) |
||||
if !ok { |
||||
klog.Errorf("Tombstone contained object that is not expected %#v", obj) |
||||
return |
||||
} |
||||
} |
||||
klog.V(4).Infof("Deleting %q", castObj.Name) |
||||
if castObj.Spec.Service != nil { |
||||
c.rebuildAPIServiceCache() |
||||
} |
||||
c.queue.Add(castObj.Name) |
||||
} |
||||
|
||||
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string { |
||||
metadata, err := meta.Accessor(obj) |
||||
if err != nil { |
||||
utilruntime.HandleError(err) |
||||
return nil |
||||
} |
||||
c.cacheLock.RLock() |
||||
defer c.cacheLock.RUnlock() |
||||
return c.cache[metadata.GetNamespace()][metadata.GetName()] |
||||
} |
||||
|
||||
// if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
|
||||
// (which will get processed an extra time - this doesn't matter),
|
||||
// and miss a newly relevant apiservice (which will get queued by the apiservice handler)
|
||||
func (c *AvailableConditionController) rebuildAPIServiceCache() { |
||||
apiServiceList, _ := c.apiServiceLister.List(labels.Everything()) |
||||
newCache := map[string]map[string][]string{} |
||||
for _, apiService := range apiServiceList { |
||||
if apiService.Spec.Service == nil { |
||||
continue |
||||
} |
||||
if newCache[apiService.Spec.Service.Namespace] == nil { |
||||
newCache[apiService.Spec.Service.Namespace] = map[string][]string{} |
||||
} |
||||
newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name] = append(newCache[apiService.Spec.Service.Namespace][apiService.Spec.Service.Name], apiService.Name) |
||||
} |
||||
|
||||
c.cacheLock.Lock() |
||||
defer c.cacheLock.Unlock() |
||||
c.cache = newCache |
||||
} |
||||
|
||||
// TODO, think of a way to avoid checking on every service manipulation
|
||||
|
||||
func (c *AvailableConditionController) addService(obj interface{}) { |
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v0alpha1.ExternalName)) { |
||||
c.queue.Add(apiService) |
||||
} |
||||
} |
||||
|
||||
func (c *AvailableConditionController) updateService(obj, _ interface{}) { |
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v0alpha1.ExternalName)) { |
||||
c.queue.Add(apiService) |
||||
} |
||||
} |
||||
|
||||
func (c *AvailableConditionController) deleteService(obj interface{}) { |
||||
castObj, ok := obj.(*v0alpha1.ExternalName) |
||||
if !ok { |
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown) |
||||
if !ok { |
||||
klog.Errorf("Couldn't get object from tombstone %#v", obj) |
||||
return |
||||
} |
||||
castObj, ok = tombstone.Obj.(*v0alpha1.ExternalName) |
||||
if !ok { |
||||
klog.Errorf("Tombstone contained object that is not expected %#v", obj) |
||||
return |
||||
} |
||||
} |
||||
for _, apiService := range c.getAPIServicesFor(castObj) { |
||||
c.queue.Add(apiService) |
||||
} |
||||
} |
@ -1,75 +0,0 @@ |
||||
package aggregator |
||||
|
||||
import ( |
||||
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" |
||||
genericapiserver "k8s.io/apiserver/pkg/server" |
||||
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" |
||||
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" |
||||
aggregatoropenapi "k8s.io/kube-aggregator/pkg/generated/openapi" |
||||
"k8s.io/kube-openapi/pkg/common" |
||||
|
||||
serviceclientset "github.com/grafana/grafana/pkg/generated/clientset/versioned" |
||||
informersv0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions" |
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder" |
||||
) |
||||
|
||||
type RemoteService struct { |
||||
Group string `yaml:"group"` |
||||
Version string `yaml:"version"` |
||||
Host string `yaml:"host"` |
||||
Port int32 `yaml:"port"` |
||||
} |
||||
|
||||
type RemoteServicesConfig struct { |
||||
ExternalNamesNamespace string |
||||
InsecureSkipTLSVerify bool |
||||
CABundle []byte |
||||
Services []RemoteService |
||||
serviceClientSet *serviceclientset.Clientset |
||||
} |
||||
|
||||
type CustomExtraConfig struct { |
||||
DiscoveryOnlyProxyClientCertFile string |
||||
DiscoveryOnlyProxyClientKeyFile string |
||||
} |
||||
|
||||
type Config struct { |
||||
KubeAggregatorConfig *aggregatorapiserver.Config |
||||
CustomExtraConfig *CustomExtraConfig // this is temporary and will be removed once we have moved across newer auth rollout in cloud
|
||||
Informers informersv0alpha1.SharedInformerFactory |
||||
RemoteServicesConfig *RemoteServicesConfig |
||||
// Builders contain prerequisite api groups for aggregator to function correctly e.g. ExternalName
|
||||
// Since the main APIServer delegate supports storage implementations that intend to be multi-tenant
|
||||
// Aggregator builders that we don't intend to use multi-tenant storage are kept in aggregator's
|
||||
// Delegate, one which is configured explicitly to use file storage only
|
||||
Builders []builder.APIGroupBuilder |
||||
} |
||||
|
||||
// remoteServices may be nil when not using aggregation
|
||||
func NewConfig(aggregator *aggregatorapiserver.Config, customExtraConfig *CustomExtraConfig, informers informersv0alpha1.SharedInformerFactory, builders []builder.APIGroupBuilder, remoteServices *RemoteServicesConfig) *Config { |
||||
getMergedOpenAPIDefinitions := func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { |
||||
aggregatorAPIs := aggregatoropenapi.GetOpenAPIDefinitions(ref) |
||||
builderAPIs := builder.GetOpenAPIDefinitions(builders)(ref) |
||||
|
||||
for k, v := range builderAPIs { |
||||
aggregatorAPIs[k] = v |
||||
} |
||||
|
||||
return aggregatorAPIs |
||||
} |
||||
|
||||
// Add OpenAPI config, which depends on builders
|
||||
namer := openapinamer.NewDefinitionNamer(aggregatorscheme.Scheme) |
||||
aggregator.GenericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getMergedOpenAPIDefinitions, namer) |
||||
aggregator.GenericConfig.OpenAPIV3Config.Info.Title = "Kubernetes" |
||||
aggregator.GenericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getMergedOpenAPIDefinitions, namer) |
||||
aggregator.GenericConfig.OpenAPIConfig.Info.Title = "Kubernetes" |
||||
|
||||
return &Config{ |
||||
aggregator, |
||||
customExtraConfig, |
||||
informers, |
||||
remoteServices, |
||||
builders, |
||||
} |
||||
} |
@ -1,14 +0,0 @@ |
||||
# NOTE: dev-mode only and governed by presence of non-empty value for cfg["grafana-apiserver"]["remote_services_file"] |
||||
# List of sample multi-tenant services to aggregate on startup |
||||
- group: example.grafana.app |
||||
version: v0alpha1 |
||||
host: localhost |
||||
port: 7443 |
||||
- group: query.grafana.app |
||||
version: v0alpha1 |
||||
host: localhost |
||||
port: 7444 |
||||
- group: testdata.datasource.grafana.app |
||||
version: v0alpha1 |
||||
host: localhost |
||||
port: 7445 |
@ -1,15 +0,0 @@ |
||||
--- |
||||
apiVersion: apiregistration.k8s.io/v1 |
||||
kind: APIService |
||||
metadata: |
||||
name: v0alpha1.example.grafana.app |
||||
spec: |
||||
version: v0alpha1 |
||||
insecureSkipTLSVerify: true |
||||
group: example.grafana.app |
||||
groupPriorityMinimum: 1000 |
||||
versionPriority: 15 |
||||
service: |
||||
name: example-apiserver |
||||
namespace: grafana |
||||
port: 7443 |
@ -1,8 +0,0 @@ |
||||
--- |
||||
apiVersion: service.grafana.app/v0alpha1 |
||||
kind: ExternalName |
||||
metadata: |
||||
name: example-apiserver |
||||
namespace: grafana |
||||
spec: |
||||
host: localhost |
@ -1,170 +0,0 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package aggregator |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"k8s.io/component-base/metrics" |
||||
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" |
||||
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" |
||||
) |
||||
|
||||
/* |
||||
* By default, all the following metrics are defined as falling under |
||||
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes)
|
||||
* |
||||
* Promoting the stability level of the metric is a responsibility of the component owner, since it |
||||
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with |
||||
* the metric stability policy. |
||||
*/ |
||||
var ( |
||||
unavailableGaugeDesc = metrics.NewDesc( |
||||
"st_aggregator_unavailable_apiservice", |
||||
"Gauge of Grafana APIServices which are marked as unavailable broken down by APIService name.", |
||||
[]string{"name"}, |
||||
nil, |
||||
metrics.ALPHA, |
||||
"", |
||||
) |
||||
) |
||||
|
||||
type Metrics struct { |
||||
unavailableCounter *metrics.CounterVec |
||||
|
||||
*availabilityCollector |
||||
} |
||||
|
||||
func newAvailabilityMetrics() *Metrics { |
||||
return &Metrics{ |
||||
unavailableCounter: metrics.NewCounterVec( |
||||
&metrics.CounterOpts{ |
||||
// These metrics are registered in the main kube-aggregator package as well, prefixing with single-tenant (ST) to avoid
|
||||
// "duplicate metrics collector registration attempted" in https://github.com/prometheus/client_golang
|
||||
// a more descriptive prefix is already added for apiserver metrics during scraping in cloud and didn't want
|
||||
// to double a word by using a word such as "grafana" here
|
||||
Name: "st_aggregator_unavailable_apiservice_total", |
||||
Help: "Counter of Grafana APIServices which are marked as unavailable broken down by APIService name and reason.", |
||||
StabilityLevel: metrics.ALPHA, |
||||
}, |
||||
[]string{"name", "reason"}, |
||||
), |
||||
availabilityCollector: newAvailabilityCollector(), |
||||
} |
||||
} |
||||
|
||||
// Register registers apiservice availability metrics.
|
||||
func (m *Metrics) Register( |
||||
registrationFunc func(metrics.Registerable) error, |
||||
customRegistrationFunc func(metrics.StableCollector) error, |
||||
) error { |
||||
err := registrationFunc(m.unavailableCounter) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = customRegistrationFunc(m.availabilityCollector) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// UnavailableCounter returns a counter to track apiservices marked as unavailable.
|
||||
func (m *Metrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric { |
||||
return m.unavailableCounter.WithLabelValues(apiServiceName, reason) |
||||
} |
||||
|
||||
type availabilityCollector struct { |
||||
metrics.BaseStableCollector |
||||
|
||||
mtx sync.RWMutex |
||||
availabilities map[string]bool |
||||
} |
||||
|
||||
// SetUnavailableGauge set the metrics so that it reflect the current state based on availability of the given service
|
||||
func (m *Metrics) SetUnavailableGauge(newAPIService *apiregistrationv1.APIService) { |
||||
if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) { |
||||
m.SetAPIServiceAvailable(newAPIService.Name) |
||||
return |
||||
} |
||||
|
||||
m.SetAPIServiceUnavailable(newAPIService.Name) |
||||
} |
||||
|
||||
// SetUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed
|
||||
func (m *Metrics) SetUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) { |
||||
wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available) |
||||
isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) |
||||
statusChanged := isAvailable != wasAvailable |
||||
|
||||
if statusChanged && !isAvailable { |
||||
reason := "UnknownReason" |
||||
if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil { |
||||
reason = newCondition.Reason |
||||
} |
||||
m.UnavailableCounter(newAPIService.Name, reason).Inc() |
||||
} |
||||
} |
||||
|
||||
// Check if apiServiceStatusCollector implements necessary interface.
|
||||
var _ metrics.StableCollector = &availabilityCollector{} |
||||
|
||||
func newAvailabilityCollector() *availabilityCollector { |
||||
return &availabilityCollector{ |
||||
availabilities: make(map[string]bool), |
||||
} |
||||
} |
||||
|
||||
// DescribeWithStability implements the metrics.StableCollector interface.
|
||||
func (c *availabilityCollector) DescribeWithStability(ch chan<- *metrics.Desc) { |
||||
ch <- unavailableGaugeDesc |
||||
} |
||||
|
||||
// CollectWithStability implements the metrics.StableCollector interface.
|
||||
func (c *availabilityCollector) CollectWithStability(ch chan<- metrics.Metric) { |
||||
c.mtx.RLock() |
||||
defer c.mtx.RUnlock() |
||||
|
||||
for apiServiceName, isAvailable := range c.availabilities { |
||||
gaugeValue := 1.0 |
||||
if isAvailable { |
||||
gaugeValue = 0.0 |
||||
} |
||||
ch <- metrics.NewLazyConstMetric( |
||||
unavailableGaugeDesc, |
||||
metrics.GaugeValue, |
||||
gaugeValue, |
||||
apiServiceName, |
||||
) |
||||
} |
||||
} |
||||
|
||||
// SetAPIServiceAvailable sets the given apiservice availability gauge to available.
|
||||
func (c *availabilityCollector) SetAPIServiceAvailable(apiServiceKey string) { |
||||
c.setAPIServiceAvailability(apiServiceKey, true) |
||||
} |
||||
|
||||
// SetAPIServiceUnavailable sets the given apiservice availability gauge to unavailable.
|
||||
func (c *availabilityCollector) SetAPIServiceUnavailable(apiServiceKey string) { |
||||
c.setAPIServiceAvailability(apiServiceKey, false) |
||||
} |
||||
|
||||
func (c *availabilityCollector) setAPIServiceAvailability(apiServiceKey string, availability bool) { |
||||
c.mtx.Lock() |
||||
defer c.mtx.Unlock() |
||||
|
||||
c.availabilities[apiServiceKey] = availability |
||||
} |
||||
|
||||
// ForgetAPIService removes the availability gauge of the given apiservice.
|
||||
func (c *availabilityCollector) ForgetAPIService(apiServiceKey string) { |
||||
c.mtx.Lock() |
||||
defer c.mtx.Unlock() |
||||
|
||||
delete(c.availabilities, apiServiceKey) |
||||
} |
@ -1,32 +0,0 @@ |
||||
package aggregator |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net" |
||||
"net/url" |
||||
|
||||
"k8s.io/kube-aggregator/pkg/apiserver" |
||||
|
||||
servicelistersv0alpha1 "github.com/grafana/grafana/pkg/generated/listers/service/v0alpha1" |
||||
) |
||||
|
||||
func NewExternalNameResolver(externalNames servicelistersv0alpha1.ExternalNameLister) apiserver.ServiceResolver { |
||||
return &externalNameResolver{ |
||||
externalNames: externalNames, |
||||
} |
||||
} |
||||
|
||||
type externalNameResolver struct { |
||||
externalNames servicelistersv0alpha1.ExternalNameLister |
||||
} |
||||
|
||||
func (r *externalNameResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) { |
||||
extName, err := r.externalNames.ExternalNames(namespace).Get(name) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return &url.URL{ |
||||
Scheme: "https", |
||||
Host: net.JoinHostPort(extName.Spec.Host, fmt.Sprintf("%d", port)), |
||||
}, nil |
||||
} |
@ -0,0 +1,26 @@ |
||||
package aggregatorrunner |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
genericapiserver "k8s.io/apiserver/pkg/server" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder" |
||||
"github.com/grafana/grafana/pkg/services/apiserver/options" |
||||
) |
||||
|
||||
type NoopAggregatorConfigurator struct { |
||||
} |
||||
|
||||
func (n NoopAggregatorConfigurator) Configure(opts *options.Options, config *genericapiserver.RecommendedConfig, delegateAPIServer genericapiserver.DelegationTarget, scheme *runtime.Scheme, builders []builder.APIGroupBuilder) (*genericapiserver.GenericAPIServer, error) { |
||||
return nil, nil |
||||
} |
||||
|
||||
func (n NoopAggregatorConfigurator) Run(ctx context.Context, transport *options.RoundTripperFunc, stoppedCh chan error) (*genericapiserver.GenericAPIServer, error) { |
||||
return nil, nil |
||||
} |
||||
|
||||
func ProvideNoopAggregatorConfigurator() AggregatorRunner { |
||||
return &NoopAggregatorConfigurator{} |
||||
} |
@ -0,0 +1,24 @@ |
||||
package aggregatorrunner |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
genericapiserver "k8s.io/apiserver/pkg/server" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder" |
||||
"github.com/grafana/grafana/pkg/services/apiserver/options" |
||||
) |
||||
|
||||
// AggregatorRunner is an interface for running an aggregator inside the same generic apiserver delegate chain
|
||||
type AggregatorRunner interface { |
||||
// Configure is called to configure the component and returns the delegate for further chaining.
|
||||
Configure(opts *options.Options, |
||||
config *genericapiserver.RecommendedConfig, |
||||
delegateAPIServer genericapiserver.DelegationTarget, |
||||
scheme *runtime.Scheme, |
||||
builders []builder.APIGroupBuilder) (*genericapiserver.GenericAPIServer, error) |
||||
|
||||
// Run starts the complete apiserver chain, expects it executes any logic inside a goroutine and doesn't block. Returns the running server.
|
||||
Run(ctx context.Context, transport *options.RoundTripperFunc, stoppedCh chan error) (*genericapiserver.GenericAPIServer, error) |
||||
} |
@ -1,114 +0,0 @@ |
||||
package options |
||||
|
||||
import ( |
||||
"github.com/spf13/pflag" |
||||
v1 "k8s.io/api/apps/v1" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
genericfeatures "k8s.io/apiserver/pkg/features" |
||||
genericapiserver "k8s.io/apiserver/pkg/server" |
||||
"k8s.io/apiserver/pkg/server/options" |
||||
"k8s.io/apiserver/pkg/server/resourceconfig" |
||||
utilfeature "k8s.io/apiserver/pkg/util/feature" |
||||
apiregistrationv1beta1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" |
||||
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" |
||||
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" |
||||
|
||||
servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1" |
||||
"github.com/grafana/grafana/pkg/storage/unified/apistore" |
||||
) |
||||
|
||||
// KubeAggregatorOptions contains the state for the aggregator apiserver
|
||||
type KubeAggregatorOptions struct { |
||||
AlternateDNS []string |
||||
ProxyClientCertFile string |
||||
ProxyClientKeyFile string |
||||
LegacyClientCertAuth bool |
||||
RemoteServicesFile string |
||||
APIServiceCABundleFile string |
||||
} |
||||
|
||||
func NewAggregatorServerOptions() *KubeAggregatorOptions { |
||||
return &KubeAggregatorOptions{} |
||||
} |
||||
|
||||
func (o *KubeAggregatorOptions) AddFlags(fs *pflag.FlagSet) { |
||||
if o == nil { |
||||
return |
||||
} |
||||
|
||||
// the following two config variables are slated to be faded out in cloud deployments after which
|
||||
// their scope is restricted to local development and non Grafana Cloud use-cases only
|
||||
// leaving them unspecified leads to graceful behavior in grafana-aggregator
|
||||
// and would work for configurations where the aggregated servers and aggregator are auth-less and trusting
|
||||
// of each other
|
||||
fs.StringVar(&o.ProxyClientCertFile, "proxy-client-cert-file", o.ProxyClientCertFile, |
||||
"path to proxy client cert file") |
||||
|
||||
fs.StringVar(&o.ProxyClientKeyFile, "proxy-client-key-file", o.ProxyClientKeyFile, |
||||
"path to proxy client key file") |
||||
|
||||
fs.BoolVar(&o.LegacyClientCertAuth, "legacy-client-cert-auth", true, |
||||
"whether to use legacy client cert auth") |
||||
} |
||||
|
||||
func (o *KubeAggregatorOptions) Validate() []error { |
||||
if o == nil { |
||||
return nil |
||||
} |
||||
|
||||
// TODO: do we need to validate anything here?
|
||||
return nil |
||||
} |
||||
|
||||
func (o *KubeAggregatorOptions) ApplyTo(aggregatorConfig *aggregatorapiserver.Config, etcdOpts *options.EtcdOptions) error { |
||||
genericConfig := aggregatorConfig.GenericConfig |
||||
|
||||
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} |
||||
genericConfig.RESTOptionsGetter = nil |
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && |
||||
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { |
||||
// Add StorageVersionPrecondition handler to aggregator-apiserver.
|
||||
// The handler will block write requests to built-in resources until the
|
||||
// target resources' storage versions are up-to-date.
|
||||
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition |
||||
} |
||||
|
||||
// copy the etcd options so we don't mutate originals.
|
||||
// we assume that the etcd options have been completed already. avoid messing with anything outside
|
||||
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
|
||||
etcdOptions := *etcdOpts |
||||
etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, |
||||
apiregistrationv1beta1.SchemeGroupVersion, |
||||
servicev0alpha1.SchemeGroupVersion) |
||||
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, |
||||
schema.GroupKind{Group: apiregistrationv1beta1.GroupName}, |
||||
schema.GroupKind{Group: servicev0alpha1.GROUP}) |
||||
etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
|
||||
if err := etcdOptions.ApplyTo(&genericConfig.Config); err != nil { |
||||
return err |
||||
} |
||||
// override the RESTOptionsGetter to use the in memory storage options
|
||||
restOptionsGetter, err := apistore.NewRESTOptionsGetterMemory(etcdOptions.StorageConfig) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
aggregatorConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter |
||||
|
||||
// prevent generic API server from installing the OpenAPI handler. Aggregator server has its own customized OpenAPI handler.
|
||||
genericConfig.SkipOpenAPIInstallation = true |
||||
mergedResourceConfig, err := resourceconfig.MergeAPIResourceConfigs(aggregatorapiserver.DefaultAPIResourceConfigSource(), nil, aggregatorscheme.Scheme) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
genericConfig.MergedResourceConfig = mergedResourceConfig |
||||
|
||||
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} |
||||
|
||||
// These hooks use v1 informers, which are not available in the grafana aggregator.
|
||||
genericConfig.DisabledPostStartHooks = genericConfig.DisabledPostStartHooks.Insert("start-kube-aggregator-informers") |
||||
genericConfig.DisabledPostStartHooks = genericConfig.DisabledPostStartHooks.Insert("apiservice-status-local-available-controller") |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,21 @@ |
||||
package options |
||||
|
||||
import ( |
||||
"net/http" |
||||
) |
||||
|
||||
// NOTE: both dataplane aggregator and kubernetes aggregator (when enterprise is linked) have logic around
|
||||
// setting this RoundTripper as ready, however, kubernetes aggregator part is skipped naturally,
|
||||
// given it is invoked as part of the delegate chain headed by the dataplane aggregator, and not through
|
||||
// its own Run method.
|
||||
type RoundTripperFunc struct { |
||||
Ready chan struct{} |
||||
Fn func(req *http.Request) (*http.Response, error) |
||||
} |
||||
|
||||
func (f *RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { |
||||
if f.Fn == nil { |
||||
<-f.Ready |
||||
} |
||||
return f.Fn(req) |
||||
} |
|
Loading…
Reference in new issue