@ -12,13 +12,18 @@
package aggregator
import (
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1"
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
@ -37,19 +42,68 @@ import (
apiregistrationInformers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
servicev0alpha1applyconfiguration "github.com/grafana/grafana/pkg/generated/applyconfiguration/service/v0alpha1"
serviceclientset "github.com/grafana/grafana/pkg/generated/clientset/versioned"
informersv0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions"
"github.com/grafana/grafana/pkg/services/apiserver/options"
)
func CreateAggregatorConfig ( commandOptions * options . Options , sharedConfig genericapiserver . RecommendedConfig ) ( * aggregatorapiserver . Config , informersv0alpha1 . SharedInformerFactory , error ) {
func readCABundlePEM ( path string , devMode bool ) ( [ ] byte , error ) {
if devMode {
return nil , nil
}
// We can ignore the gosec G304 warning on this one because `path` comes
// from Grafana configuration (commandOptions.AggregatorOptions.APIServiceCABundleFile)
//nolint:gosec
f , err := os . Open ( path )
if err != nil {
return nil , err
}
defer func ( ) {
if err := f . Close ( ) ; err != nil {
klog . Errorf ( "error closing remote services file: %s" , err )
}
} ( )
return io . ReadAll ( f )
}
func readRemoteServices ( path string ) ( [ ] RemoteService , error ) {
// We can ignore the gosec G304 warning on this one because `path` comes
// from Grafana configuration (commandOptions.AggregatorOptions.RemoteServicesFile)
//nolint:gosec
f , err := os . Open ( path )
if err != nil {
return nil , err
}
defer func ( ) {
if err := f . Close ( ) ; err != nil {
klog . Errorf ( "error closing remote services file: %s" , err )
}
} ( )
rawRemoteServices , err := io . ReadAll ( f )
if err != nil {
return nil , err
}
remoteServices := make ( [ ] RemoteService , 0 )
if err := yaml . Unmarshal ( rawRemoteServices , & remoteServices ) ; err != nil {
return nil , err
}
return remoteServices , nil
}
func CreateAggregatorConfig ( commandOptions * options . Options , sharedConfig genericapiserver . RecommendedConfig , externalNamesNamespace string ) ( * Config , error ) {
// Create a fake clientset and informers for the k8s v1 API group.
// These are not used in grafana's aggregator because v1 APIs are not available.
fakev1Informers := informers . NewSharedInformerFactory ( fake . NewSimpleClientset ( ) , 10 * time . Minute )
serviceClient , err := serviceclientset . NewForConfig ( sharedConfig . LoopbackClientConfig )
if err != nil {
return nil , nil , err
return nil , err
}
sharedInformerFactory := informersv0alpha1 . NewSharedInformerFactory (
serviceClient ,
@ -74,13 +128,35 @@ func CreateAggregatorConfig(commandOptions *options.Options, sharedConfig generi
}
if err := commandOptions . AggregatorOptions . ApplyTo ( aggregatorConfig , commandOptions . RecommendedOptions . Etcd , commandOptions . StorageOptions . DataPath ) ; err != nil {
return nil , nil , err
return nil , err
}
// Exit early, if no remote services file is configured
if commandOptions . AggregatorOptions . RemoteServicesFile == "" {
return NewConfig ( aggregatorConfig , sharedInformerFactory , nil ) , nil
}
return aggregatorConfig , sharedInformerFactory , nil
caBundlePEM , err := readCABundlePEM ( commandOptions . AggregatorOptions . APIServiceCABundleFile , commandOptions . ExtraOptions . DevMode )
if err != nil {
return nil , err
}
remoteServices , err := readRemoteServices ( commandOptions . AggregatorOptions . RemoteServicesFile )
if err != nil {
return nil , err
}
remoteServicesConfig := & RemoteServicesConfig {
InsecureSkipTLSVerify : commandOptions . ExtraOptions . DevMode ,
ExternalNamesNamespace : externalNamesNamespace ,
CABundle : caBundlePEM ,
Services : remoteServices ,
serviceClientSet : serviceClient ,
}
return NewConfig ( aggregatorConfig , sharedInformerFactory , remoteServicesConfig ) , nil
}
func CreateAggregatorServer ( aggregatorConfig * aggregatorapiserver . Config , sharedInformerFactory informersv0alpha1 . SharedInformerFactory , delegateAPIServer genericapiserver . DelegationTarget ) ( * aggregatorapiserver . APIAggregator , error ) {
func CreateAggregatorServer ( aggregatorConfig * aggregatorapiserver . Config , sharedInformerFactory informersv0alpha1 . SharedInformerFactory , remoteServicesConfig * RemoteServicesConfig , delegateAPIServer genericapiserver . DelegationTarget ) ( * aggregatorapiserver . APIAggregator , error ) {
completedConfig := aggregatorConfig . Complete ( )
aggregatorServer , err := completedConfig . NewWithDelegate ( delegateAPIServer )
if err != nil {
@ -111,6 +187,27 @@ func CreateAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, shared
return nil , err
}
if remoteServicesConfig != nil {
addRemoteAPIServicesToRegister ( remoteServicesConfig , autoRegistrationController )
externalNames := getRemoteExternalNamesToRegister ( remoteServicesConfig )
err = aggregatorServer . GenericAPIServer . AddPostStartHook ( "grafana-apiserver-remote-autoregistration" , func ( _ genericapiserver . PostStartHookContext ) error {
namespacedClient := remoteServicesConfig . serviceClientSet . ServiceV0alpha1 ( ) . ExternalNames ( remoteServicesConfig . ExternalNamesNamespace )
for _ , externalName := range externalNames {
_ , err := namespacedClient . Apply ( context . Background ( ) , externalName , metav1 . ApplyOptions {
FieldManager : "grafana-aggregator" ,
Force : true ,
} )
if err != nil {
return err
}
}
return nil
} )
if err != nil {
return nil , err
}
}
err = aggregatorServer . GenericAPIServer . AddBootSequenceHealthChecks (
makeAPIServiceAvailableHealthCheck (
"autoregister-completion" ,
@ -240,6 +337,51 @@ var APIVersionPriorities = map[schema.GroupVersion]Priority{
// Version can be set to 9 (to have space around) for a new group.
}
func addRemoteAPIServicesToRegister ( config * RemoteServicesConfig , registration autoregister . AutoAPIServiceRegistration ) {
for i , service := range config . Services {
port := service . Port
apiService := & v1 . APIService {
ObjectMeta : metav1 . ObjectMeta { Name : service . Version + "." + service . Group } ,
Spec : v1 . APIServiceSpec {
Group : service . Group ,
Version : service . Version ,
InsecureSkipTLSVerify : config . InsecureSkipTLSVerify ,
CABundle : config . CABundle ,
// TODO: Group priority minimum of 1000 more than for local services, figure out a better story
// when we have multiple versions, potentially running in heterogeneous ways (local and remote)
GroupPriorityMinimum : 16000 ,
VersionPriority : 1 + int32 ( i ) ,
Service : & v1 . ServiceReference {
Name : service . Version + "." + service . Group ,
Namespace : config . ExternalNamesNamespace ,
Port : & port ,
} ,
} ,
}
registration . AddAPIServiceToSyncOnStart ( apiService )
}
}
func getRemoteExternalNamesToRegister ( config * RemoteServicesConfig ) [ ] * servicev0alpha1applyconfiguration . ExternalNameApplyConfiguration {
externalNames := make ( [ ] * servicev0alpha1applyconfiguration . ExternalNameApplyConfiguration , 0 )
for _ , service := range config . Services {
host := service . Host
name := service . Version + "." + service . Group
externalName := & servicev0alpha1applyconfiguration . ExternalNameApplyConfiguration { }
externalName . WithAPIVersion ( servicev0alpha1 . SchemeGroupVersion . String ( ) )
externalName . WithKind ( "ExternalName" )
externalName . WithName ( name )
externalName . WithSpec ( & servicev0alpha1applyconfiguration . ExternalNameSpecApplyConfiguration {
Host : & host ,
} )
externalNames = append ( externalNames , externalName )
}
return externalNames
}
func apiServicesToRegister ( delegateAPIServer genericapiserver . DelegationTarget , registration autoregister . AutoAPIServiceRegistration ) [ ] * v1 . APIService {
apiServices := [ ] * v1 . APIService { }