Dashboards: Add k8s fallback client (#103404)

pull/103485/head
Todd Treece 4 months ago committed by GitHub
parent b329b78ef6
commit 427715b070
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 157
      pkg/services/dashboards/service/client/client.go
  2. 357
      pkg/services/dashboards/service/client/client_test.go
  3. 20
      pkg/services/dashboards/service/client/metrics.go
  4. 9
      pkg/services/dashboards/service/dashboard_service.go

@ -0,0 +1,157 @@
package client
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel/attribute"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
dashboardv1alpha1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1alpha1"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/apiserver"
"github.com/grafana/grafana/pkg/services/apiserver/client"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/search/sort"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/prometheus/client_golang/prometheus"
)
type K8sClientFactory func(ctx context.Context, version string) client.K8sHandler
type K8sClientWithFallback struct {
client.K8sHandler
newClientFunc K8sClientFactory
metrics *k8sClientMetrics
log log.Logger
}
func NewK8sClientWithFallback(
cfg *setting.Cfg,
restConfigProvider apiserver.RestConfigProvider,
dashboardStore dashboards.Store,
userService user.Service,
resourceClient resource.ResourceClient,
sorter sort.Service,
dual dualwrite.Service,
reg prometheus.Registerer,
) *K8sClientWithFallback {
newClientFunc := newK8sClientFactory(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual)
return &K8sClientWithFallback{
K8sHandler: newClientFunc(context.Background(), dashboardv1alpha1.VERSION),
newClientFunc: newClientFunc,
metrics: newK8sClientMetrics(reg),
log: log.New("dashboards-k8s-client"),
}
}
func (h *K8sClientWithFallback) Get(ctx context.Context, name string, orgID int64, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) {
spanCtx, span := tracing.Start(ctx, "versionFallbackK8sHandler.Get")
defer span.End()
span.SetAttributes(
attribute.String("dashboard.metadata.name", name),
attribute.Int64("org.id", orgID),
attribute.Bool("fallback", false),
)
span.AddEvent("v1alpha1 Get")
result, err := h.K8sHandler.Get(spanCtx, name, orgID, options, subresources...)
if err != nil {
return nil, tracing.Error(span, err)
}
failed, storedVersion, conversionErr := getConversionStatus(result)
if !failed {
// if the conversion did not fail, there is no need to fallback.
return result, nil
}
h.log.Info("falling back to stored version", "name", name, "storedVersion", storedVersion, "conversionErr", conversionErr)
h.metrics.fallbackCounter.WithLabelValues(storedVersion).Inc()
span.SetAttributes(
attribute.Bool("fallback", true),
attribute.String("fallback.stored_version", storedVersion),
attribute.String("fallback.conversion_error", conversionErr),
)
span.AddEvent(fmt.Sprintf("%s Get", storedVersion))
return h.newClientFunc(spanCtx, storedVersion).Get(spanCtx, name, orgID, options, subresources...)
}
func getConversionStatus(obj *unstructured.Unstructured) (failed bool, storedVersion string, conversionErr string) {
status, found, _ := unstructured.NestedMap(obj.Object, "status")
if !found {
return false, "", ""
}
conversionStatus, found, _ := unstructured.NestedMap(status, "conversion")
if !found {
return false, "", ""
}
failed, _, _ = unstructured.NestedBool(conversionStatus, "failed")
storedVersion, _, _ = unstructured.NestedString(conversionStatus, "storedVersion")
conversionErr, _, _ = unstructured.NestedString(conversionStatus, "error")
return failed, storedVersion, conversionErr
}
func newK8sClientFactory(
cfg *setting.Cfg,
restConfigProvider apiserver.RestConfigProvider,
dashboardStore dashboards.Store,
userService user.Service,
resourceClient resource.ResourceClient,
sorter sort.Service,
dual dualwrite.Service,
) K8sClientFactory {
clientCache := make(map[string]client.K8sHandler)
cacheMutex := &sync.RWMutex{}
return func(ctx context.Context, version string) client.K8sHandler {
_, span := tracing.Start(ctx, "k8sClientFactory.GetClient",
attribute.String("group", dashboardv1alpha1.GROUP),
attribute.String("version", version),
attribute.String("resource", "dashboards"),
)
defer span.End()
cacheMutex.RLock()
cachedClient, exists := clientCache[version]
cacheMutex.RUnlock()
if exists {
span.AddEvent("Client found in cache")
return cachedClient
}
cacheMutex.Lock()
defer cacheMutex.Unlock()
// check again in case another goroutine created in between locks
cachedClient, exists = clientCache[version]
if exists {
span.AddEvent("Client found in cache after lock")
return cachedClient
}
gvr := schema.GroupVersionResource{
Group: dashboardv1alpha1.GROUP,
Version: version,
Resource: "dashboards",
}
span.AddEvent("Creating new client")
newClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), gvr, restConfigProvider.GetRestConfig, dashboardStore, userService, resourceClient, sorter)
clientCache[version] = newClient
return newClient
}
}

@ -0,0 +1,357 @@
package client
import (
"context"
"errors"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
dashboardv1alpha1 "github.com/grafana/grafana/apps/dashboard/pkg/apis/dashboard/v1alpha1"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/apiserver/client"
)
type testSetup struct {
handler *K8sClientWithFallback
mockClientV1Alpha1 *client.MockK8sHandler
mockClientV2Alpha1 *client.MockK8sHandler
mockMetrics *k8sClientMetrics
mockFactoryCalls map[string]int
t *testing.T
}
func setupTest(t *testing.T) *testSetup {
mockClientV1Alpha1 := &client.MockK8sHandler{}
mockClientV2Alpha1 := &client.MockK8sHandler{}
mockMetrics := newK8sClientMetrics(prometheus.NewRegistry())
mockFactoryCalls := make(map[string]int)
handler := &K8sClientWithFallback{
K8sHandler: mockClientV1Alpha1,
newClientFunc: func(ctx context.Context, version string) client.K8sHandler {
mockFactoryCalls[version]++
if version == "v2alpha1" {
return mockClientV2Alpha1
}
if version == dashboardv1alpha1.VERSION {
return mockClientV1Alpha1
}
t.Fatalf("Unexpected call to newClientFunc with version %s", version)
return nil
},
log: log.New("test"),
metrics: mockMetrics,
}
return &testSetup{
handler: handler,
mockClientV1Alpha1: mockClientV1Alpha1,
mockClientV2Alpha1: mockClientV2Alpha1,
mockMetrics: mockMetrics,
mockFactoryCalls: mockFactoryCalls,
t: t,
}
}
func TestK8sHandlerWithFallback_Get(t *testing.T) {
t.Run("Get without fallback", func(t *testing.T) {
setup := setupTest(t)
ctx := context.Background()
name := "test-dashboard"
orgID := int64(1)
options := metav1.GetOptions{}
expectedResult := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": name,
},
"status": map[string]interface{}{
"someOtherStatus": "ok",
},
},
}
setup.mockClientV1Alpha1.On("Get", mock.Anything, name, orgID, options, mock.Anything).Return(expectedResult, nil).Once()
result, err := setup.handler.Get(ctx, name, orgID, options)
require.NoError(t, err)
require.Equal(t, expectedResult, result)
require.Equal(t, 0, len(setup.mockFactoryCalls), "Factory should not be called for non-fallback case")
setup.mockClientV1Alpha1.AssertExpectations(t)
setup.mockClientV2Alpha1.AssertExpectations(t)
})
t.Run("Get with fallback due to conversion error", func(t *testing.T) {
setup := setupTest(t)
ctx := context.Background()
name := "test-dashboard-fallback"
orgID := int64(2)
options := metav1.GetOptions{ResourceVersion: "123"}
storedVersion := "v2alpha1"
conversionErr := "failed to convert"
v1alpha1Result := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": name,
},
"status": map[string]interface{}{
"conversion": map[string]interface{}{
"failed": true,
"storedVersion": storedVersion,
"error": conversionErr,
},
},
},
}
expectedResultFallback := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "dashboard/v2alpha1",
"kind": "Dashboard",
"metadata": map[string]interface{}{
"name": name,
},
},
}
setup.mockClientV1Alpha1.On("Get", mock.Anything, name, orgID, options, mock.Anything).Return(v1alpha1Result, nil).Once()
setup.mockClientV2Alpha1.On("Get", mock.Anything, name, orgID, options, mock.Anything).Return(expectedResultFallback, nil).Once()
result, err := setup.handler.Get(ctx, name, orgID, options)
require.NoError(t, err)
require.Equal(t, expectedResultFallback, result)
require.Equal(t, 1, setup.mockFactoryCalls["v2alpha1"], "Factory should be called once with v2alpha1")
setup.mockClientV1Alpha1.AssertExpectations(t)
setup.mockClientV2Alpha1.AssertExpectations(t)
})
t.Run("Get initial error", func(t *testing.T) {
setup := setupTest(t)
ctx := context.Background()
name := "test-dashboard-error"
orgID := int64(3)
options := metav1.GetOptions{}
expectedErr := errors.New("initial get failed")
setup.mockClientV1Alpha1.On("Get", mock.Anything, name, orgID, options, mock.Anything).Return(nil, expectedErr).Once()
_, err := setup.handler.Get(ctx, name, orgID, options)
require.Error(t, err)
require.Equal(t, expectedErr, err)
require.Equal(t, 0, len(setup.mockFactoryCalls), "Factory should not be called for error case")
setup.mockClientV1Alpha1.AssertExpectations(t)
setup.mockClientV2Alpha1.AssertExpectations(t)
})
t.Run("Get with fallback fails", func(t *testing.T) {
setup := setupTest(t)
ctx := context.Background()
name := "test-dashboard-fallback-error"
orgID := int64(4)
options := metav1.GetOptions{}
storedVersion := "v2alpha1"
conversionErr := "failed to convert again"
fallbackErr := errors.New("fallback get failed")
v1alpha1Result := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"name": name,
},
"status": map[string]interface{}{
"conversion": map[string]interface{}{
"failed": true,
"storedVersion": storedVersion,
"error": conversionErr,
},
},
},
}
setup.mockClientV1Alpha1.On("Get", mock.Anything, name, orgID, options, mock.Anything).Return(v1alpha1Result, nil).Once()
setup.mockClientV2Alpha1.On("Get", mock.Anything, name, orgID, options, mock.Anything).Return(nil, fallbackErr).Once()
_, err := setup.handler.Get(ctx, name, orgID, options)
require.Error(t, err)
require.Equal(t, fallbackErr, err)
require.Equal(t, 1, setup.mockFactoryCalls["v2alpha1"], "Factory should be called once with v2alpha1")
setup.mockClientV1Alpha1.AssertExpectations(t)
setup.mockClientV2Alpha1.AssertExpectations(t)
})
}
func TestGetConversionStatus(t *testing.T) {
tests := []struct {
name string
obj *unstructured.Unstructured
expectedFailed bool
expectedStoredVersion string
expectedError string
}{
{
name: "No status field",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
}},
expectedFailed: false,
expectedStoredVersion: "",
expectedError: "",
},
{
name: "Status field, but no conversion field",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": map[string]interface{}{"someOtherStatus": "ok"},
}},
expectedFailed: false,
expectedStoredVersion: "",
expectedError: "",
},
{
name: "Conversion field, failed=true, with storedVersion and error",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": map[string]interface{}{
"conversion": map[string]interface{}{
"failed": true,
"storedVersion": "v2alpha1",
"error": "conversion failed",
},
},
}},
expectedFailed: true,
expectedStoredVersion: "v2alpha1",
expectedError: "conversion failed",
},
{
name: "Conversion field, failed=false",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": map[string]interface{}{
"conversion": map[string]interface{}{
"failed": false,
"storedVersion": "v1alpha1",
"error": "",
},
},
}},
expectedFailed: false,
expectedStoredVersion: "v1alpha1",
expectedError: "",
},
{
name: "Conversion field, missing failed (defaults to false)",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": map[string]interface{}{
"conversion": map[string]interface{}{
"storedVersion": "v1alpha1",
"error": "",
},
},
}},
expectedFailed: false,
expectedStoredVersion: "v1alpha1",
expectedError: "",
},
{
name: "Conversion field, failed=true, missing storedVersion",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": map[string]interface{}{
"conversion": map[string]interface{}{
"failed": true,
"error": "conversion failed",
},
},
}},
expectedFailed: true,
expectedStoredVersion: "",
expectedError: "conversion failed",
},
{
name: "Conversion field, failed=true, missing error",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": map[string]interface{}{
"conversion": map[string]interface{}{
"failed": true,
"storedVersion": "v2alpha1",
},
},
}},
expectedFailed: true,
expectedStoredVersion: "v2alpha1",
expectedError: "",
},
{
name: "Empty object",
obj: &unstructured.Unstructured{Object: map[string]interface{}{}},
expectedFailed: false,
expectedStoredVersion: "",
expectedError: "",
},
{
name: "Nil object",
obj: nil,
expectedFailed: false,
expectedStoredVersion: "",
expectedError: "",
},
{
name: "Status not a map",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": "not a map",
}},
expectedFailed: false,
expectedStoredVersion: "",
expectedError: "",
},
{
name: "Conversion not a map",
obj: &unstructured.Unstructured{Object: map[string]interface{}{
"metadata": map[string]interface{}{"name": "test"},
"status": map[string]interface{}{
"conversion": "not a map",
},
}},
expectedFailed: false,
expectedStoredVersion: "",
expectedError: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var input *unstructured.Unstructured
if tt.obj != nil {
input = tt.obj.DeepCopy()
} else {
input = &unstructured.Unstructured{Object: map[string]interface{}{}}
}
failed, storedVersion, conversionErr := getConversionStatus(input)
require.Equal(t, tt.expectedFailed, failed, "failed mismatch")
require.Equal(t, tt.expectedStoredVersion, storedVersion, "storedVersion mismatch")
require.Equal(t, tt.expectedError, conversionErr, "conversionErr mismatch")
})
}
}

@ -0,0 +1,20 @@
package client
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type k8sClientMetrics struct {
fallbackCounter *prometheus.CounterVec
}
func newK8sClientMetrics(reg prometheus.Registerer) *k8sClientMetrics {
return &k8sClientMetrics{
fallbackCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "grafana",
Name: "dashboard_stored_version_fallback_total",
Help: "Number of K8s dashboard client requests to storedVersion",
}, []string{"stored_version"}),
}
}

@ -13,6 +13,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -40,9 +41,9 @@ import (
"github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/apiserver" "github.com/grafana/grafana/pkg/services/apiserver"
"github.com/grafana/grafana/pkg/services/apiserver/client" "github.com/grafana/grafana/pkg/services/apiserver/client"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/dashboards/dashboardaccess" "github.com/grafana/grafana/pkg/services/dashboards/dashboardaccess"
dashboardclient "github.com/grafana/grafana/pkg/services/dashboards/service/client"
dashboardsearch "github.com/grafana/grafana/pkg/services/dashboards/service/search" dashboardsearch "github.com/grafana/grafana/pkg/services/dashboards/service/search"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/folder"
@ -59,7 +60,6 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/resource" "github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
"github.com/grafana/grafana/pkg/util/retryer" "github.com/grafana/grafana/pkg/util/retryer"
"go.opentelemetry.io/otel/attribute"
) )
var ( var (
@ -374,8 +374,7 @@ func ProvideDashboardServiceImpl(
serverLockService *serverlock.ServerLockService, serverLockService *serverlock.ServerLockService,
kvstore kvstore.KVStore, kvstore kvstore.KVStore,
) (*DashboardServiceImpl, error) { ) (*DashboardServiceImpl, error) {
k8sHandler := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), dashboardv1alpha1.DashboardResourceInfo.GroupVersionResource(), restConfigProvider.GetRestConfig, dashboardStore, userService, resourceClient, sorter) k8sclient := dashboardclient.NewK8sClientWithFallback(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual, r)
dashSvc := &DashboardServiceImpl{ dashSvc := &DashboardServiceImpl{
cfg: cfg, cfg: cfg,
log: log.New("dashboard-service"), log: log.New("dashboard-service"),
@ -386,7 +385,7 @@ func ProvideDashboardServiceImpl(
folderStore: folderStore, folderStore: folderStore,
folderService: folderSvc, folderService: folderSvc,
orgService: orgService, orgService: orgService,
k8sclient: k8sHandler, k8sclient: k8sclient,
metrics: newDashboardsMetrics(r), metrics: newDashboardsMetrics(r),
dashboardPermissionsReady: make(chan struct{}), dashboardPermissionsReady: make(chan struct{}),
publicDashboardService: publicDashboardService, publicDashboardService: publicDashboardService,

Loading…
Cancel
Save