The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/server/search_server_distributor_t...

429 lines
14 KiB

package server
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"strconv"
"sync"
"testing"
"time"
claims "github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/api"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
"github.com/grafana/grafana/pkg/storage/unified/search"
"github.com/grafana/grafana/pkg/storage/unified/sql"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"k8s.io/component-base/metrics/legacyregistry"
)
var (
testIndexFileThreshold = 200 // just needs to be bigger than max playlist number, so the indexer don't use the filesystem
namespaceCount = 250 // how many stacks we're simulating
maxPlaylistPerNamespace = 50 // upper bound on how many playlists we will seed to each stack.
)
//nolint:gocyclo
func TestIntegrationDistributor(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
dbType := sqlutil.GetTestDBType()
if dbType != "mysql" {
t.Skip()
}
// this next line is to avoid double registration when registering sprinkles metrics
legacyregistry.Registerer = func() prometheus.Registerer { return prometheus.NewRegistry() }
db, err := sqlutil.GetTestDB(dbType)
require.NoError(t, err)
testNamespaces := make([]string, 0, namespaceCount)
for i := range namespaceCount {
testNamespaces = append(testNamespaces, "stacks-"+strconv.Itoa(i))
}
baselineServer := createBaselineServer(t, dbType, db.ConnStr, testNamespaces)
testServers := make([]testModuleServer, 0, 2)
memberlistPort := getRandomPort()
distributorServer := initDistributorServerForTest(t, memberlistPort)
testServers = append(testServers, createStorageServerApi(t, 1, dbType, db.ConnStr, memberlistPort))
testServers = append(testServers, createStorageServerApi(t, 2, dbType, db.ConnStr, memberlistPort))
startAndWaitHealthy(t, distributorServer)
for _, testServer := range testServers {
startAndWaitHealthy(t, testServer)
}
t.Run("should expose ring endpoint", func(t *testing.T) {
client := http.Client{}
res, err := client.Get(fmt.Sprintf("http://localhost:%s/ring", distributorServer.httpPort))
require.NoError(t, err)
require.Equal(t, res.StatusCode, http.StatusOK)
_ = res.Body.Close()
})
t.Run("should expose memberlist endpoint", func(t *testing.T) {
client := http.Client{}
res, err := client.Get(fmt.Sprintf("http://localhost:%s/memberlist", distributorServer.httpPort))
require.NoError(t, err)
require.Equal(t, res.StatusCode, http.StatusOK)
_ = res.Body.Close()
})
t.Run("GetStats", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.ResourceStatsRequest{
Namespace: ns,
}
baselineRes := getBaselineResponse(t, req, baselineServer.GetStats)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.GetStats, instanceResponseCount)
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
t.Run("CountManagedObjects", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.CountManagedObjectsRequest{
Namespace: ns,
}
baselineRes := getBaselineResponse(t, req, baselineServer.CountManagedObjects)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.CountManagedObjects, instanceResponseCount)
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
t.Run("ListManagedObjects", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.ListManagedObjectsRequest{
Namespace: ns,
}
baselineRes := getBaselineResponse(t, req, baselineServer.ListManagedObjects)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.ListManagedObjects, instanceResponseCount)
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
t.Run("Search", func(t *testing.T) {
instanceResponseCount := make(map[string]int)
for _, ns := range testNamespaces {
req := &resourcepb.ResourceSearchRequest{
Options: &resourcepb.ListOptions{
Key: &resourcepb.ResourceKey{
Group: "playlist.grafana.app",
Resource: "aoeuaeou",
Namespace: ns,
},
},
}
baselineRes := getBaselineResponse(t, req, baselineServer.Search)
distributorRes := getDistributorResponse(t, req, distributorServer.resourceClient.Search, instanceResponseCount)
// sometimes the querycost is different between the two. Happens randomly and we don't have control over it
// as it comes from bleve. Since we are not testing search functionality we hard-set this to 0 to avoid
// flaky tests
distributorRes.QueryCost = 0
baselineRes.QueryCost = 0
require.Equal(t, baselineRes.String(), distributorRes.String())
}
for instance, count := range instanceResponseCount {
require.GreaterOrEqual(t, count, 1, "instance did not get any traffic: "+instance)
}
})
var wg sync.WaitGroup
for _, testServer := range testServers {
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := testServer.server.Shutdown(ctx, "tests are done"); err != nil {
require.NoError(t, err)
}
}()
}
wg.Wait()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := distributorServer.server.Shutdown(ctx, "tests are done"); err != nil {
require.NoError(t, err)
}
}
func getBaselineResponse[Req any, Resp any](t *testing.T, req *Req, fn func(ctx context.Context, req *Req) (*Resp, error)) *Resp {
ctx := context.Background()
baselineRes, err := fn(ctx, req)
require.NoError(t, err)
return baselineRes
}
func getDistributorResponse[Req any, Resp any](t *testing.T, req *Req, fn func(ctx context.Context, req *Req, opts ...grpc.CallOption) (*Resp, error), instanceResponseCount map[string]int) *Resp {
ctx := identity.WithServiceIdentityContext(context.Background(), 1)
var header metadata.MD
res, err := fn(ctx, req, grpc.Header(&header))
require.NoError(t, err)
instance := header.Get("proxied-instance-id")
if len(instance) != 1 {
t.Fatal("received invalid proxied-instance-id header", instance)
}
instanceResponseCount[instance[0]] += 1
return res
}
func startAndWaitHealthy(t *testing.T, testServer testModuleServer) {
go func() {
// this next line is to avoid double registration, as both InitializeDocumentBuilders as well as ProvideUnifiedStorageGrpcService
// are hard-coded to use prometheus.DefaultRegisterer
// the alternative would be to get the registry from wire, in which case the tests would receive a new
// registry automatically, but that _may_ change metric names
// We can remove this once that's fixed
prometheus.DefaultRegisterer = prometheus.NewRegistry()
if err := testServer.server.Run(); err != nil && !errors.Is(err, context.Canceled) {
require.NoError(t, err)
}
}()
deadline := time.Now().Add(20 * time.Second)
for {
conn, err := net.DialTimeout("tcp", testServer.grpcAddress, 1*time.Second)
if err == nil {
_ = conn.Close()
break
}
if time.Now().After(deadline) {
t.Fatal("server failed to become ready: ", testServer.id)
}
time.Sleep(1 * time.Second)
}
res, err := testServer.healthClient.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
require.Equal(t, res.Status, grpc_health_v1.HealthCheckResponse_SERVING)
}
type testModuleServer struct {
server *ModuleServer
healthClient grpc_health_v1.HealthClient
resourceClient resource.ResourceClient
id string
grpcAddress string
httpPort string
}
func getRandomPort() int {
ln, _ := net.Listen("tcp", "127.0.0.1:0")
_ = ln.Close()
return ln.Addr().(*net.TCPAddr).Port
}
func initDistributorServerForTest(t *testing.T, memberlistPort int) testModuleServer {
cfg := setting.NewCfg()
cfg.HTTPPort = strconv.Itoa(getRandomPort())
cfg.GRPCServer.Network = "tcp"
cfg.GRPCServer.Address = "127.0.0.1:" + strconv.Itoa(getRandomPort())
cfg.EnableSharding = true
cfg.MemberlistBindAddr = "127.0.0.1"
cfg.MemberlistJoinMember = "127.0.0.1:" + strconv.Itoa(memberlistPort)
cfg.MemberlistAdvertiseAddr = "127.0.0.1"
cfg.MemberlistAdvertisePort = memberlistPort
cfg.Target = []string{modules.SearchServerDistributor}
cfg.InstanceID = "distributor" // does nothing for the distributor but may be useful to debug tests
conn, err := grpc.NewClient(cfg.GRPCServer.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
client := resource.NewLegacyResourceClient(conn, conn)
server := initModuleServerForTest(t, cfg, Options{}, api.ServerOptions{})
server.resourceClient = client
return server
}
func createStorageServerApi(t *testing.T, instanceId int, dbType, dbConnStr string, memberlistPort int) testModuleServer {
cfg := setting.NewCfg()
section, err := cfg.Raw.NewSection("database")
require.NoError(t, err)
_, err = section.NewKey("type", dbType)
require.NoError(t, err)
_, err = section.NewKey("connection_string", dbConnStr)
require.NoError(t, err)
cfg.HTTPPort = strconv.Itoa(getRandomPort())
cfg.GRPCServer.Network = "tcp"
cfg.GRPCServer.Address = "127.0.0.1:" + strconv.Itoa(getRandomPort())
cfg.EnableSharding = true
cfg.MemberlistBindAddr = "127.0.0.1"
cfg.MemberlistJoinMember = "127.0.0.1:" + strconv.Itoa(memberlistPort)
cfg.MemberlistAdvertiseAddr = "127.0.0.1"
cfg.MemberlistAdvertisePort = getRandomPort()
cfg.InstanceID = "instance-" + strconv.Itoa(instanceId)
cfg.IndexPath = t.TempDir() + cfg.InstanceID
cfg.IndexFileThreshold = testIndexFileThreshold
cfg.Target = []string{modules.StorageServer}
return initModuleServerForTest(t, cfg, Options{}, api.ServerOptions{})
}
func initModuleServerForTest(
t *testing.T,
cfg *setting.Cfg,
opts Options,
apiOpts api.ServerOptions,
) testModuleServer {
tracer := tracing.InitializeTracerForTest()
ms, err := NewModule(opts, apiOpts, featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch), cfg, nil, nil, prometheus.NewRegistry(), prometheus.DefaultGatherer, tracer, nil)
require.NoError(t, err)
conn, err := grpc.NewClient(cfg.GRPCServer.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
healthClient := grpc_health_v1.NewHealthClient(conn)
return testModuleServer{server: ms, grpcAddress: cfg.GRPCServer.Address, httpPort: cfg.HTTPPort, healthClient: healthClient, id: cfg.InstanceID}
}
func createBaselineServer(t *testing.T, dbType, dbConnStr string, testNamespaces []string) resource.ResourceServer {
cfg := setting.NewCfg()
section, err := cfg.Raw.NewSection("database")
require.NoError(t, err)
_, err = section.NewKey("type", dbType)
require.NoError(t, err)
_, err = section.NewKey("connection_string", dbConnStr)
require.NoError(t, err)
cfg.IndexPath = t.TempDir()
cfg.IndexFileThreshold = testIndexFileThreshold
features := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearch)
docBuilders, err := InitializeDocumentBuilders(cfg)
require.NoError(t, err)
tracer := noop.NewTracerProvider().Tracer("test-tracer")
require.NoError(t, err)
searchOpts, err := search.NewSearchOptions(features, cfg, tracer, docBuilders, nil)
require.NoError(t, err)
server, err := sql.NewResourceServer(sql.ServerOptions{
DB: nil,
Cfg: cfg,
Tracer: tracer,
Reg: nil,
AccessClient: nil,
SearchOptions: searchOpts,
StorageMetrics: nil,
IndexMetrics: nil,
Features: features,
QOSQueue: nil,
})
require.NoError(t, err)
testUserA := &identity.StaticRequester{
Type: claims.TypeUser,
Login: "testuser",
UserID: 123,
UserUID: "u123",
OrgRole: identity.RoleAdmin,
IsGrafanaAdmin: true, // can do anything
}
ctx := claims.WithAuthInfo(context.Background(), testUserA)
for _, ns := range testNamespaces {
for range rand.Intn(maxPlaylistPerNamespace) + 1 {
_, err = server.Create(ctx, generatePlaylistPayload(ns))
require.NoError(t, err)
}
}
return server
}
var counter int
func generatePlaylistPayload(ns string) *resourcepb.CreateRequest {
name := "playlist" + strconv.Itoa(counter)
counter += 1
return &resourcepb.CreateRequest{
Value: []byte(fmt.Sprintf(`{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "%s",
"uid": "xyz",
"namespace": "%s",
"annotations": {
"grafana.app/repoName": "elsewhere",
"grafana.app/repoPath": "path/to/item",
"grafana.app/repoTimestamp": "2024-02-02T00:00:00Z"
}
},
"spec": {
"title": "hello",
"interval": "5m",
"items": [
{
"type": "dashboard_by_uid",
"value": "vmie2cmWz"
}
]
}
}`, name, ns)),
Key: &resourcepb.ResourceKey{
Group: "playlist.grafana.app",
Resource: "aoeuaeou",
Namespace: ns,
Name: name,
},
}
}