Search Poc: Add metrics (#95111)

pull/95212/head
owensmallwood 7 months ago committed by GitHub
parent 12cc312920
commit 9763199b53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      pkg/setting/setting.go
  2. 4
      pkg/setting/setting_unified_storage.go
  3. 87
      pkg/storage/unified/resource/index.go
  4. 4
      pkg/storage/unified/resource/index_mapping.go
  5. 125
      pkg/storage/unified/resource/index_metrics.go
  6. 40
      pkg/storage/unified/resource/index_server.go
  7. 2
      pkg/storage/unified/sql/server.go

@ -532,6 +532,7 @@ type Cfg struct {
// Unified Storage
UnifiedStorage map[string]UnifiedStorageConfig
IndexPath string
}
type UnifiedStorageConfig struct {
@ -1338,8 +1339,9 @@ func (cfg *Cfg) parseINIFile(iniFile *ini.File) error {
cfg.ScopesListScopesURL = scopesSection.Key("list_scopes_endpoint").MustString("")
cfg.ScopesListDashboardsURL = scopesSection.Key("list_dashboards_endpoint").MustString("")
// read unifed storage config
// unified storage config
cfg.setUnifiedStorageConfig()
cfg.setIndexPath()
return nil
}

@ -36,3 +36,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
}
cfg.UnifiedStorage = storageConfig
}
func (cfg *Cfg) setIndexPath() {
cfg.IndexPath = cfg.Raw.Section("unified_storage").Key("index_path").String()
}

@ -2,10 +2,10 @@ package resource
import (
"context"
"encoding/json"
"fmt"
golog "log"
"os"
"path/filepath"
"time"
"github.com/blevesearch/bleve/v2"
"github.com/google/uuid"
@ -23,30 +23,37 @@ type Index struct {
opts Opts
s *server
log log.Logger
path string
}
func NewIndex(s *server, opts Opts) *Index {
func NewIndex(s *server, opts Opts, path string) *Index {
if path == "" {
path = os.TempDir()
}
idx := &Index{
s: s,
opts: opts,
shards: make(map[string]Shard),
log: log.New("unifiedstorage.search.index"),
path: path,
}
return idx
}
func (i *Index) IndexBatch(list *ListResponse, kind string) error {
for _, obj := range list.Items {
res, err := getResource(obj.Value)
res, err := NewIndexedResource(obj.Value)
if err != nil {
return err
}
shard, err := i.getShard(tenant(res))
shard, err := i.getShard(res.Namespace)
if err != nil {
return err
}
i.log.Debug("initial indexing resources batch", "count", len(list.Items), "kind", kind, "tenant", tenant(res))
i.log.Debug("initial indexing resources batch", "count", len(list.Items), "kind", kind, "tenant", res.Namespace)
// Transform the raw resource into a more generic indexable resource
indexableResource, err := NewIndexedResource(obj.Value)
@ -54,7 +61,7 @@ func (i *Index) IndexBatch(list *ListResponse, kind string) error {
return err
}
err = shard.batch.Index(res.Metadata.Uid, indexableResource)
err = shard.batch.Index(res.Uid, indexableResource)
if err != nil {
return err
}
@ -72,6 +79,8 @@ func (i *Index) IndexBatch(list *ListResponse, kind string) error {
}
func (i *Index) Init(ctx context.Context) error {
start := time.Now().Unix()
resourceTypes := fetchResourceTypes()
for _, rt := range resourceTypes {
i.log.Info("indexing resource", "kind", rt.Key.Resource)
@ -98,29 +107,37 @@ func (i *Index) Init(ctx context.Context) error {
}
}
end := time.Now().Unix()
if IndexServerMetrics != nil {
IndexServerMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start))
}
return nil
}
func (i *Index) Index(ctx context.Context, data *Data) error {
res, err := getResource(data.Value.Value)
// Transform the raw resource into a more generic indexable resource
res, err := NewIndexedResource(data.Value.Value)
if err != nil {
return err
}
tenant := tenant(res)
i.log.Debug("indexing resource for tenant", "res", res, "tenant", tenant)
tenant := res.Namespace
i.log.Debug("indexing resource for tenant", "res", string(data.Value.Value), "tenant", tenant)
shard, err := i.getShard(tenant)
if err != nil {
return err
}
// Transform the raw resource into a more generic indexable resource
indexableResource, err := NewIndexedResource(data.Value.Value)
err = shard.index.Index(res.Uid, res)
if err != nil {
return err
}
err = shard.index.Index(res.Metadata.Uid, indexableResource)
if err != nil {
return err
// record latency from when event was created to when it was indexed
latencySeconds := float64(time.Now().UnixMicro()-data.Value.ResourceVersion) / 1e6
if IndexServerMetrics != nil {
IndexServerMetrics.IndexLatency.WithLabelValues(data.Key.Resource).Observe(latencySeconds)
}
return nil
}
@ -182,39 +199,14 @@ func (i *Index) Search(ctx context.Context, tenant string, query string, limit i
return results, nil
}
func tenant(res *Resource) string {
return res.Metadata.Namespace
}
type SearchSummary struct {
Kind string `json:"kind"`
Metadata `json:"metadata"`
Spec map[string]interface{} `json:"spec"`
}
type Metadata struct {
Name string
Namespace string
Uid string `json:"uid"`
CreationTimestamp string `json:"creationTimestamp"`
Labels map[string]string
Annotations map[string]string
}
type Resource struct {
Kind string
ApiVersion string
Metadata Metadata
}
type Opts struct {
Workers int // This controls how many goroutines are used to index objects
BatchSize int // This is the batch size for how many objects to add to the index at once
Concurrent bool
}
func createFileIndex() (bleve.Index, string, error) {
indexPath := fmt.Sprintf("%s%s.bleve", os.TempDir(), uuid.New().String())
func createFileIndex(path string) (bleve.Index, string, error) {
indexPath := filepath.Join(path, uuid.New().String())
index, err := bleve.New(indexPath, createIndexMappings())
if err != nil {
golog.Fatalf("Failed to create index: %v", err)
@ -222,21 +214,12 @@ func createFileIndex() (bleve.Index, string, error) {
return index, indexPath, err
}
func getResource(data []byte) (*Resource, error) {
res := &Resource{}
err := json.Unmarshal(data, res)
if err != nil {
return nil, err
}
return res, nil
}
func (i *Index) getShard(tenant string) (Shard, error) {
shard, ok := i.shards[tenant]
if ok {
return shard, nil
}
index, path, err := createFileIndex()
index, path, err := createFileIndex(i.path)
if err != nil {
return Shard{}, err
}

@ -11,6 +11,7 @@ import (
)
type IndexedResource struct {
Uid string
Group string
Namespace string
Kind string
@ -25,6 +26,7 @@ type IndexedResource struct {
}
func (ir IndexedResource) FromSearchHit(hit *search.DocumentMatch) IndexedResource {
ir.Uid = hit.Fields["Uid"].(string)
ir.Kind = hit.Fields["Kind"].(string)
ir.Name = hit.Fields["Name"].(string)
ir.Namespace = hit.Fields["Namespace"].(string)
@ -64,6 +66,7 @@ func NewIndexedResource(rawResource []byte) (*IndexedResource, error) {
return nil, err
}
ir.Uid = string(meta.GetUID())
ir.Name = meta.GetName()
ir.Title = meta.FindTitle("")
ir.Namespace = meta.GetNamespace()
@ -108,6 +111,7 @@ func createIndexMappings() *mapping.IndexMappingImpl {
func createIndexMappingForKind(resourceKind string) *mapping.DocumentMapping {
// create mappings for top level fields
baseFields := map[string]*mapping.FieldMapping{
"Uid": bleve.NewTextFieldMapping(),
"Group": bleve.NewTextFieldMapping(),
"Namespace": bleve.NewTextFieldMapping(),
"Kind": bleve.NewTextFieldMapping(),

@ -0,0 +1,125 @@
package resource
import (
"os"
"path/filepath"
"sync"
"time"
"github.com/grafana/dskit/instrument"
"github.com/prometheus/client_golang/prometheus"
)
var (
onceIndex sync.Once
IndexServerMetrics *IndexMetrics
)
type IndexMetrics struct {
IndexDir string
IndexServer *IndexServer
// metrics
IndexLatency *prometheus.HistogramVec
IndexSize prometheus.Gauge
IndexedDocs prometheus.Gauge
IndexCreationTime *prometheus.HistogramVec
}
var IndexCreationBuckets = []float64{1, 5, 10, 25, 50, 75, 100, 150, 200, 250, 300}
func NewIndexMetrics(indexDir string, indexServer *IndexServer) *IndexMetrics {
onceIndex.Do(func() {
IndexServerMetrics = &IndexMetrics{
IndexDir: indexDir,
IndexServer: indexServer,
IndexLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "index_server",
Name: "index_latency_seconds",
Help: "Time (in seconds) until index is updated with new event",
Buckets: instrument.DefBuckets,
NativeHistogramBucketFactor: 1.1, // enable native histograms
NativeHistogramMaxBucketNumber: 160,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"resource"}),
IndexSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "index_server",
Name: "index_size",
Help: "Size of the index in bytes",
}),
IndexedDocs: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "index_server",
Name: "indexed_docs",
Help: "Number of indexed documents by resource",
}),
IndexCreationTime: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "index_server",
Name: "index_creation_time_seconds",
Help: "Time (in seconds) it takes until index is created",
Buckets: IndexCreationBuckets,
NativeHistogramBucketFactor: 1.1, // enable native histograms
NativeHistogramMaxBucketNumber: 160,
NativeHistogramMinResetDuration: time.Hour,
}, []string{}),
}
})
return IndexServerMetrics
}
func (s *IndexMetrics) Collect(ch chan<- prometheus.Metric) {
s.IndexLatency.Collect(ch)
s.IndexCreationTime.Collect(ch)
// collect index size
totalSize, err := getTotalIndexSize(s.IndexDir)
if err == nil {
s.IndexSize.Set(float64(totalSize))
s.IndexSize.Collect(ch)
}
// collect index docs
s.IndexedDocs.Set(getTotalDocCount(s.IndexServer.index))
s.IndexedDocs.Collect(ch)
}
func (s *IndexMetrics) Describe(ch chan<- *prometheus.Desc) {
s.IndexLatency.Describe(ch)
}
func getTotalDocCount(index *Index) float64 {
var totalCount float64
totalCount = 0
if index == nil {
return totalCount
}
for _, shard := range index.shards {
docCount, err := shard.index.DocCount()
if err != nil {
continue
}
totalCount += float64(docCount)
}
return totalCount
}
func getTotalIndexSize(dir string) (int64, error) {
var totalSize int64
err := filepath.WalkDir(dir, func(path string, info os.DirEntry, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
fileInfo, err := info.Info()
if err != nil {
return err
}
totalSize += fileInfo.Size()
}
return nil
})
return totalSize, err
}

@ -5,8 +5,9 @@ import (
"encoding/json"
"errors"
"log/slog"
"strings"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
@ -16,6 +17,7 @@ type IndexServer struct {
index *Index
ws *indexWatchServer
log *slog.Logger
cfg *setting.Cfg
}
func (is *IndexServer) Search(ctx context.Context, req *SearchRequest) (*SearchResponse, error) {
@ -44,7 +46,7 @@ func (is *IndexServer) Origin(ctx context.Context, req *OriginRequest) (*OriginR
// Load the index
func (is *IndexServer) Load(ctx context.Context) error {
is.index = NewIndex(is.s, Opts{})
is.index = NewIndex(is.s, Opts{}, is.cfg.IndexPath)
err := is.index.Init(ctx)
if err != nil {
return err
@ -86,10 +88,20 @@ func (is *IndexServer) Init(ctx context.Context, rs *server) error {
return nil
}
func NewResourceIndexServer() ResourceIndexServer {
return &IndexServer{
log: slog.Default().With("logger", "index-server"),
func NewResourceIndexServer(cfg *setting.Cfg) ResourceIndexServer {
logger := slog.Default().With("logger", "index-server")
indexServer := &IndexServer{
log: logger,
cfg: cfg,
}
err := prometheus.Register(NewIndexMetrics(cfg.IndexPath, indexServer))
if err != nil {
logger.Warn("Failed to register index metrics", "error", err)
}
return indexServer
}
type ResourceIndexer interface {
@ -191,32 +203,24 @@ type Data struct {
Uid string
}
func getGroup(r *Resource) string {
v := strings.Split(r.ApiVersion, "/")
if len(v) > 0 {
return v[0]
}
return ""
}
func getData(wr *WatchEvent_Resource) (*Data, error) {
r, err := getResource(wr.Value)
r, err := NewIndexedResource(wr.Value)
if err != nil {
return nil, err
}
key := &ResourceKey{
Group: getGroup(r),
Group: r.Group,
Resource: r.Kind,
Namespace: r.Metadata.Namespace,
Name: r.Metadata.Name,
Namespace: r.Namespace,
Name: r.Name,
}
value := &ResourceWrapper{
ResourceVersion: wr.Version,
Value: wr.Value,
}
return &Data{Key: key, Value: value, Uid: r.Metadata.Uid}, nil
return &Data{Key: key, Value: value, Uid: r.Uid}, nil
}
func resource(we *WatchEvent) (*WatchEvent_Resource, error) {

@ -50,7 +50,7 @@ func NewResourceServer(ctx context.Context, db infraDB.DB, cfg *setting.Cfg, fea
opts.Lifecycle = store
if features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearch) {
opts.Index = resource.NewResourceIndexServer()
opts.Index = resource.NewResourceIndexServer(cfg)
server, err := resource.NewResourceServer(opts)
if err != nil {
return nil, err

Loading…
Cancel
Save