From 337f2a0a8bdacca5d1779cf8219f73f393d6310b Mon Sep 17 00:00:00 2001 From: Scott Lepper Date: Mon, 28 Oct 2024 15:48:49 -0400 Subject: [PATCH] [search] add index; increase page size (#95515) [search] add index; increase page size --- pkg/storage/unified/resource/index.go | 33 ++++-- pkg/storage/unified/resource/index_mapping.go | 5 +- pkg/storage/unified/resource/index_test.go | 112 ++++++++++++++++++ .../unified/sql/db/migrations/resource_mig.go | 4 + 4 files changed, 138 insertions(+), 16 deletions(-) create mode 100644 pkg/storage/unified/resource/index_test.go diff --git a/pkg/storage/unified/resource/index.go b/pkg/storage/unified/resource/index.go index c184c8679b4..dba715abb1d 100644 --- a/pkg/storage/unified/resource/index.go +++ b/pkg/storage/unified/resource/index.go @@ -16,6 +16,7 @@ import ( ) const tracingPrexfixIndex = "unified_storage.index." +const pageSize = 10000 type Shard struct { index bleve.Index @@ -49,27 +50,21 @@ func NewIndex(s *server, opts Opts, path string, tracer tracing.Tracer) *Index { return idx } -func (i *Index) IndexBatch(ctx context.Context, list *ListResponse, kind string) error { +func (i *Index) IndexBatch(ctx context.Context, list *ListResponse) error { ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"CreateIndexBatches") for _, obj := range list.Items { - res, err := NewIndexedResource(obj.Value) - if err != nil { - return err - } - - shard, err := i.getShard(res.Namespace) + indexableResource, err := NewIndexedResource(obj.Value) if err != nil { return err } - i.log.Debug("indexing resource in batch", "batch_count", len(list.Items), "kind", kind, "tenant", res.Namespace) - // Transform the raw resource into a more generic indexable resource - indexableResource, err := NewIndexedResource(obj.Value) + shard, err := i.getShard(indexableResource.Namespace) if err != nil { return err } + i.log.Debug("indexing resource in batch", "batch_count", len(list.Items), "tenant", indexableResource.Namespace) - err = shard.batch.Index(res.Uid, indexableResource) + err = shard.batch.Index(indexableResource.Uid, indexableResource) if err != nil { return err } @@ -98,7 +93,7 @@ func (i *Index) Init(ctx context.Context) error { totalObjectsFetched := 0 for _, rt := range resourceTypes { i.log.Info("indexing resource", "kind", rt.Key.Resource) - r := &ListRequest{Options: rt, Limit: 100} + r := &ListRequest{Options: rt, Limit: pageSize} // Paginate through the list of resources and index each page for { @@ -111,7 +106,7 @@ func (i *Index) Init(ctx context.Context) error { totalObjectsFetched += len(list.Items) // Index current page - err = i.IndexBatch(ctx, list, rt.Key.Resource) + err = i.IndexBatch(ctx, list) if err != nil { return err } @@ -226,6 +221,18 @@ func (i *Index) Search(ctx context.Context, tenant string, query string, limit i return results, nil } +func (i *Index) Count() (uint64, error) { + var total uint64 + for _, shard := range i.shards { + count, err := shard.index.DocCount() + if err != nil { + i.log.Error("failed to get doc count", "error", err) + } + total += count + } + return total, nil +} + type Opts struct { Workers int // This controls how many goroutines are used to index objects BatchSize int // This is the batch size for how many objects to add to the index at once diff --git a/pkg/storage/unified/resource/index_mapping.go b/pkg/storage/unified/resource/index_mapping.go index 17eb610c256..f5fee18997f 100644 --- a/pkg/storage/unified/resource/index_mapping.go +++ b/pkg/storage/unified/resource/index_mapping.go @@ -38,7 +38,7 @@ func (ir IndexedResource) FromSearchHit(hit *search.DocumentMatch) IndexedResour ir.Title = hit.Fields["Title"].(string) // add indexed spec fields to search results - specResult := map[string]interface{}{} + specResult := map[string]any{} for k, v := range hit.Fields { if strings.HasPrefix(k, "Spec.") { specKey := strings.TrimPrefix(k, "Spec.") @@ -53,8 +53,6 @@ func (ir IndexedResource) FromSearchHit(hit *search.DocumentMatch) IndexedResour // NewIndexedResource creates a new IndexedResource from a raw resource. // rawResource is the raw json for the resource from unified storage. func NewIndexedResource(rawResource []byte) (*IndexedResource, error) { - ir := &IndexedResource{} - k8sObj := unstructured.Unstructured{} err := k8sObj.UnmarshalJSON(rawResource) if err != nil { @@ -66,6 +64,7 @@ func NewIndexedResource(rawResource []byte) (*IndexedResource, error) { return nil, err } + ir := &IndexedResource{} ir.Uid = string(meta.GetUID()) ir.Name = meta.GetName() ir.Title = meta.FindTitle("") diff --git a/pkg/storage/unified/resource/index_test.go b/pkg/storage/unified/resource/index_test.go new file mode 100644 index 00000000000..3efd32b99a7 --- /dev/null +++ b/pkg/storage/unified/resource/index_test.go @@ -0,0 +1,112 @@ +package resource + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" +) + +func TestIndexBatch(t *testing.T) { + tracingCfg := tracing.NewEmptyTracingConfig() + trace, err := tracing.ProvideService(tracingCfg) + if err != nil { + t.Fatal(err) + return + } + + tmpdir := os.TempDir() + "testindexbatch" + + defer func() { + err := os.RemoveAll(tmpdir) + if err != nil { + t.Fatal(err) + return + } + }() + + index := &Index{ + tracer: trace, + shards: make(map[string]Shard), + path: tmpdir, + log: log.New("unifiedstorage.search.index"), + } + + ctx := context.Background() + startAll := time.Now() + + // simulate 10 List calls + for i := 0; i < 10; i++ { + list := &ListResponse{Items: loadTestItems(strconv.Itoa(i))} + start := time.Now() + err = index.IndexBatch(ctx, list) + if err != nil { + t.Fatal(err) + return + } + elapsed := time.Since(start) + fmt.Println("Time elapsed:", elapsed) + } + + elapsed := time.Since(startAll) + fmt.Println("Total Time elapsed:", elapsed) + + assert.Equal(t, 3, len(index.shards)) + + total, err := index.Count() + if err != nil { + t.Fatal(err) + return + } + + assert.Equal(t, uint64(100000), total) +} + +func loadTestItems(uid string) []*ResourceWrapper { + resource := `{ + "kind": "", + "title": "test", + "metadata": { + "uid": "", + "name": "test", + "namespace": "" + }, + "spec": { + "title": "test", + "description": "test", + "interval": "5m" + } + }` + + items := []*ResourceWrapper{} + for i := 0; i < 10000; i++ { + res := strings.Replace(resource, "", strconv.Itoa(i)+uid, 1) + // shuffle kinds + kind := namespaces[rand.Intn(len(kinds))] + res = strings.Replace(res, "", kind, 1) + // shuffle namespaces + ns := namespaces[rand.Intn(len(namespaces))] + res = strings.Replace(res, "", ns, 1) + items = append(items, &ResourceWrapper{Value: []byte(res)}) + } + return items +} + +var namespaces = []string{ + "tenant1", + "tenant2", + "tenant3", +} + +var kinds = []string{ + "playlist", + "folder", +} diff --git a/pkg/storage/unified/sql/db/migrations/resource_mig.go b/pkg/storage/unified/sql/db/migrations/resource_mig.go index 47963cc61f3..92466dd50c0 100644 --- a/pkg/storage/unified/sql/db/migrations/resource_mig.go +++ b/pkg/storage/unified/sql/db/migrations/resource_mig.go @@ -109,5 +109,9 @@ func initResourceTables(mg *migrator.Migrator) string { Cols: []string{"group", "resource", "resource_version"}, Type: migrator.IndexType, })) + mg.AddMigration("Add index to resource for loading", migrator.NewAddIndexMigration(resource_table, &migrator.Index{ + Cols: []string{"group", "resource"}, Type: migrator.IndexType, + })) + return marker }