unified-storage: Add integration tests for distributor (#105771)

* Add integration tests for distributor
pull/107395/head
Will Assis 3 weeks ago committed by GitHub
parent 99782ae406
commit 7aad041f70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 415
      pkg/server/distributor_test.go
  2. 2
      pkg/server/memberlist.go
  3. 1
      pkg/setting/setting.go
  4. 1
      pkg/setting/setting_unified_storage.go
  5. 2
      pkg/storage/unified/resource/distributor.go

@ -0,0 +1,415 @@
package server
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"strconv"
"sync"
"testing"
"time"
claims "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/api"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"k8s.io/component-base/metrics/legacyregistry"
)
var (
testIndexFileThreshold = 200 // just needs to be bigger than max playlist number, so the indexer don't use the filesystem
namespaceCount = 250 // how many stacks we're simulating
maxPlaylistPerNamespace = 50 // upper bound on how many playlists we will seed to each stack.
)
//nolint:gocyclo
func TestIntegrationDistributor(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
dbType := sqlutil.GetTestDBType()
if dbType != "mysql" {
t.Skip()
}
// this next line is to avoid double registration when registering sprinkles metrics
legacyregistry.Registerer = func() prometheus.Registerer { return prometheus.NewRegistry() }
db, err := sqlutil.GetTestDB(dbType)
require.NoError(t, err)
testNamespaces := make([]string, 0, namespaceCount)
for i := range namespaceCount {
testNamespaces = append(testNamespaces, "stacks-"+strconv.Itoa(i))
}
baselineServer := createBaselineServer(t, dbType, db.ConnStr, testNamespaces)
testServers := make([]testModuleServer, 0, 2)
memberlistPort := getRandomPort()
distributorServer := initDistributorServerForTest(t, memberlistPort)
testServers = append(testServers, createStorageServerApi(t, 1, dbType, db.ConnStr, memberlistPort))
testServers = append(testServers, createStorageServerApi(t, 2, dbType, db.ConnStr, memberlistPort))
startAndWaitHealthy(t, distributorServer)
for _, testServer := range testServers {
startAndWaitHealthy(t, testServer)
}
t.Run("should expose ring endpoint", func(t *testing.T) {
client := http.Client{}
res, err := client.Get(fmt.Sprintf("http://localhost:%s/ring", distributorServer.httpPort))
require.NoError(t, err)
require.Equal(t, res.StatusCode, http.StatusOK)
_ = res.Body.Close()
})
t.Run("should expose memberlist endpoint", func(t *testing.T) {
client := http.Client{}
res, err := client.Get(fmt.Sprintf("http://localhost:%s/memberlist", distributorServer.httpPort))
require.NoError(t, err)
require.Equal(t, res.StatusCode, http.StatusOK)
_ = res.Body.Close()
})
t.Run("GetStats", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.ResourceStatsRequest{
Namespace: ns,
}
baselineRes := getBaselineResponse(t, req, baselineServer.GetStats)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.GetStats, instanceResponseCount)
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
t.Run("CountManagedObjects", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.CountManagedObjectsRequest{
Namespace: ns,
}
baselineRes := getBaselineResponse(t, req, baselineServer.CountManagedObjects)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.CountManagedObjects, instanceResponseCount)
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
t.Run("ListManagedObjects", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.ListManagedObjectsRequest{
Namespace: ns,
}
baselineRes := getBaselineResponse(t, req, baselineServer.ListManagedObjects)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.ListManagedObjects, instanceResponseCount)
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
t.Run("Search", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "playlist.grafana.app",
Resource: "aoeuaeou",
Namespace: ns,
},
},
}
baselineRes := getBaselineResponse(t, req, baselineServer.Search)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.Search, instanceResponseCount)
// sometimes the querycost is different between the two. Happens randomly and we don't have control over it
// as it comes from bleve. Since we are not testing search functionality we hard-set this to 0 to avoid
// flaky tests
distributorRes.QueryCost = 0
baselineRes.QueryCost = 0
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
var wg sync.WaitGroup
for _, testServer := range testServers {
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := testServer.server.Shutdown(ctx, "tests are done"); err != nil {
require.NoError(t, err)
}
}()
}
wg.Wait()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := distributorServer.server.Shutdown(ctx, "tests are done"); err != nil {
require.NoError(t, err)
}
}
func getBaselineResponse[Req any, Resp any](t *testing.T, req *Req, fn func(ctx context.Context, req *Req) (*Resp, error)) *Resp {
ctx := context.Background()
baselineRes, err := fn(ctx, req)
require.NoError(t, err)
return baselineRes
}
func getDistributorResponse[Req any, Resp any](t *testing.T, req *Req, fn func(ctx context.Context, req *Req, opts ...grpc.CallOption) (*Resp, error), instanceResponseCount map[string]int) *Resp {
ctx := identity.WithServiceIdentityContext(context.Background(), 1)
var header metadata.MD
res, err := fn(ctx, req, grpc.Header(&header))
require.NoError(t, err)
instance := header.Get("proxied-instance-id")
if len(instance) != 1 {
t.Fatal("received invalid proxied-instance-id header", instance)
}
instanceResponseCount[instance[0]] += 1
return res
}
func startAndWaitHealthy(t *testing.T, testServer testModuleServer) {
go func() {
// this next line is to avoid double registration, as both InitializeDocumentBuilders as well as ProvideUnifiedStorageGrpcService
// are hard-coded to use prometheus.DefaultRegisterer
// the alternative would be to get the registry from wire, in which case the tests would receive a new
// registry automatically, but that _may_ change metric names
// We can remove this once that's fixed
prometheus.DefaultRegisterer = prometheus.NewRegistry()
if err := testServer.server.Run(); err != nil && !errors.Is(err, context.Canceled) {
require.NoError(t, err)
}
}()
deadline := time.Now().Add(20 * time.Second)
for {
conn, err := net.DialTimeout("tcp", testServer.grpcAddress, 1*time.Second)
if err == nil {
_ = conn.Close()
break
}
if time.Now().After(deadline) {
t.Fatal("server failed to become ready: ", testServer.id)
}
time.Sleep(1 * time.Second)
}
res, err := testServer.healthClient.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
require.Equal(t, res.Status, grpc_health_v1.HealthCheckResponse_SERVING)
}
type testModuleServer struct {
server *ModuleServer
healthClient grpc_health_v1.HealthClient
resourceClient resource.ResourceClient
id string
grpcAddress string
httpPort string
}
func getRandomPort() int {
ln, _ := net.Listen("tcp", "127.0.0.1:0")
_ = ln.Close()
return ln.Addr().(*net.TCPAddr).Port
}
func initDistributorServerForTest(t *testing.T, memberlistPort int) testModuleServer {
cfg := setting.NewCfg()
cfg.HTTPPort = strconv.Itoa(getRandomPort())
cfg.GRPCServer.Network = "tcp"
cfg.GRPCServer.Address = "127.0.0.1:" + strconv.Itoa(getRandomPort())
cfg.EnableSharding = true
cfg.MemberlistBindAddr = "127.0.0.1"
cfg.MemberlistJoinMember = "127.0.0.1:" + strconv.Itoa(memberlistPort)
cfg.MemberlistAdvertiseAddr = "127.0.0.1"
cfg.MemberlistAdvertisePort = memberlistPort
cfg.Target = []string{modules.Distributor}
cfg.InstanceID = "distributor" // does nothing for the distributor but may be useful to debug tests
conn, err := grpc.NewClient(cfg.GRPCServer.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
client := resource.NewLegacyResourceClient(conn, conn)
server := initModuleServerForTest(t, cfg, Options{}, api.ServerOptions{})
server.resourceClient = client
return server
}
func createStorageServerApi(t *testing.T, instanceId int, dbType, dbConnStr string, memberlistPort int) testModuleServer {
cfg := setting.NewCfg()
section, err := cfg.Raw.NewSection("database")
require.NoError(t, err)
_, err = section.NewKey("type", dbType)
require.NoError(t, err)
_, err = section.NewKey("connection_string", dbConnStr)
require.NoError(t, err)
cfg.HTTPPort = strconv.Itoa(getRandomPort())
cfg.GRPCServer.Network = "tcp"
cfg.GRPCServer.Address = "127.0.0.1:" + strconv.Itoa(getRandomPort())
cfg.EnableSharding = true
cfg.MemberlistBindAddr = "127.0.0.1"
cfg.MemberlistJoinMember = "127.0.0.1:" + strconv.Itoa(memberlistPort)
cfg.MemberlistAdvertiseAddr = "127.0.0.1"
cfg.MemberlistAdvertisePort = getRandomPort()
cfg.InstanceID = "instance-" + strconv.Itoa(instanceId)
cfg.IndexPath = t.TempDir() + cfg.InstanceID
cfg.IndexFileThreshold = testIndexFileThreshold
cfg.Target = []string{modules.StorageServer}
return initModuleServerForTest(t, cfg, Options{}, api.ServerOptions{})
}
func initModuleServerForTest(
t *testing.T,
cfg *setting.Cfg,
opts Options,
apiOpts api.ServerOptions,
) testModuleServer {
ms, err := NewModule(opts, apiOpts, featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch), cfg, nil, nil, prometheus.NewRegistry(), prometheus.DefaultGatherer, nil)
require.NoError(t, err)
conn, err := grpc.NewClient(cfg.GRPCServer.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
healthClient := grpc_health_v1.NewHealthClient(conn)
return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID}
}
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.ResourceServer {
cfg := setting.NewCfg()
section, err := cfg.Raw.NewSection("database")
require.NoError(t, err)
_, err = section.NewKey("type", dbType)
require.NoError(t, err)
_, err = section.NewKey("connection_string", dbConnStr)
require.NoError(t, err)
cfg.IndexPath = t.TempDir()
cfg.IndexFileThreshold = testIndexFileThreshold
features := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch)
docBuilders, err := InitializeDocumentBuilders(cfg)
require.NoError(t, err)
tracer := noop.NewTracerProvider().Tracer("test-tracer")
require.NoError(t, err)
searchOpts, err := search.NewSearchOptions(features, cfg, tracer, docBuilders, nil)
require.NoError(t, err)
server, err := sql.NewResourceServer(nil, cfg, tracer, nil, nil, searchOpts, nil, nil, features)
require.NoError(t, err)
testUserA := &identity.StaticRequester{
Type: claims.TypeUser,
Login: "testuser",
UserID: 123,
UserUID: "u123",
OrgRole: identity.RoleAdmin,
IsGrafanaAdmin: true, // can do anything
}
ctx := claims.WithAuthInfo(context.Background(), testUserA)
for _, ns := range testNamespaces {
for range rand.Intn(maxPlaylistPerNamespace) + 1 {
_, err = server.Create(ctx, generatePlaylistPayload(ns))
require.NoError(t, err)
}
}
return server
}
var counter int
func generatePlaylistPayload(ns string) *resourcepb.CreateRequest {
name := "playlist" + strconv.Itoa(counter)
counter += 1
return &resourcepb.CreateRequest{
Value: []byte(fmt.Sprintf(`{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "%s",
"uid": "xyz",
"namespace": "%s",
"annotations": {
"grafana.app/repoName": "elsewhere",
"grafana.app/repoPath": "path/to/item",
"grafana.app/repoTimestamp": "2024-02-02T00:00:00Z"
}
},
"spec": {
"title": "hello",
"interval": "5m",
"items": [
{
"type": "dashboard_by_uid",
"value": "vmie2cmWz"
}
]
}
}`, name, ns)),
Key: &resourcepb.ResourceKey{
Group: "playlist.grafana.app",
Resource: "aoeuaeou",
Namespace: ns,
Name: name,
},
}
}

@ -51,6 +51,8 @@ func toMemberlistConfig(cfg *setting.Cfg) *memberlist.KVConfig {
if cfg.MemberlistAdvertiseAddr != "" {
memberlistKVcfg.AdvertiseAddr = cfg.MemberlistAdvertiseAddr
}
memberlistKVcfg.AdvertisePort = cfg.MemberlistAdvertisePort
memberlistKVcfg.TCPTransport.BindPort = cfg.MemberlistAdvertisePort
memberlistKVcfg.JoinMembers = []string{cfg.MemberlistJoinMember}
return memberlistKVcfg

@ -569,6 +569,7 @@ type Cfg struct {
EnableSharding bool
MemberlistBindAddr string
MemberlistAdvertiseAddr string
MemberlistAdvertisePort int
MemberlistJoinMember string
MemberlistClusterLabel string
MemberlistClusterLabelVerificationDisabled bool

@ -58,6 +58,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
cfg.EnableSharding = section.Key("enable_sharding").MustBool(false)
cfg.MemberlistBindAddr = section.Key("memberlist_bind_addr").String()
cfg.MemberlistAdvertiseAddr = section.Key("memberlist_advertise_addr").String()
cfg.MemberlistAdvertisePort = section.Key("memberlist_advertise_port").MustInt(7946)
cfg.MemberlistJoinMember = section.Key("memberlist_join_member").String()
cfg.MemberlistClusterLabel = section.Key("memberlist_cluster_label").String()
cfg.MemberlistClusterLabelVerificationDisabled = section.Key("memberlist_cluster_label_verification_disabled").MustBool(false)

@ -246,6 +246,8 @@ func (ds *distributorServer) getClientToDistributeRequest(ctx context.Context, n
ds.log.Info("distributing request to ", "methodName", methodName, "instanceId", rs.Instances[0].Id)
_ = grpc.SetHeader(ctx, metadata.Pairs("proxied-instance-id", rs.Instances[0].Id))
return userutils.InjectOrgID(metadata.NewOutgoingContext(ctx, md), namespace), client.(*RingClient).Client, nil
}

Loading…
Cancel
Save