[search] add index; increase page size (#95515)

[search] add index; increase page size
pull/95078/head^2
Scott Lepper 7 months ago committed by GitHub
parent c9abe04d46
commit 337f2a0a8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 33
      pkg/storage/unified/resource/index.go
  2. 5
      pkg/storage/unified/resource/index_mapping.go
  3. 112
      pkg/storage/unified/resource/index_test.go
  4. 4
      pkg/storage/unified/sql/db/migrations/resource_mig.go

@ -16,6 +16,7 @@ import (
) )
const tracingPrexfixIndex = "unified_storage.index." const tracingPrexfixIndex = "unified_storage.index."
const pageSize = 10000
type Shard struct { type Shard struct {
index bleve.Index index bleve.Index
@ -49,27 +50,21 @@ func NewIndex(s *server, opts Opts, path string, tracer tracing.Tracer) *Index {
return idx return idx
} }
func (i *Index) IndexBatch(ctx context.Context, list *ListResponse, kind string) error { func (i *Index) IndexBatch(ctx context.Context, list *ListResponse) error {
ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"CreateIndexBatches") ctx, span := i.tracer.Start(ctx, tracingPrexfixIndex+"CreateIndexBatches")
for _, obj := range list.Items { for _, obj := range list.Items {
res, err := NewIndexedResource(obj.Value) indexableResource, err := NewIndexedResource(obj.Value)
if err != nil {
return err
}
shard, err := i.getShard(res.Namespace)
if err != nil { if err != nil {
return err return err
} }
i.log.Debug("indexing resource in batch", "batch_count", len(list.Items), "kind", kind, "tenant", res.Namespace)
// Transform the raw resource into a more generic indexable resource shard, err := i.getShard(indexableResource.Namespace)
indexableResource, err := NewIndexedResource(obj.Value)
if err != nil { if err != nil {
return err return err
} }
i.log.Debug("indexing resource in batch", "batch_count", len(list.Items), "tenant", indexableResource.Namespace)
err = shard.batch.Index(res.Uid, indexableResource) err = shard.batch.Index(indexableResource.Uid, indexableResource)
if err != nil { if err != nil {
return err return err
} }
@ -98,7 +93,7 @@ func (i *Index) Init(ctx context.Context) error {
totalObjectsFetched := 0 totalObjectsFetched := 0
for _, rt := range resourceTypes { for _, rt := range resourceTypes {
i.log.Info("indexing resource", "kind", rt.Key.Resource) i.log.Info("indexing resource", "kind", rt.Key.Resource)
r := &ListRequest{Options: rt, Limit: 100} r := &ListRequest{Options: rt, Limit: pageSize}
// Paginate through the list of resources and index each page // Paginate through the list of resources and index each page
for { for {
@ -111,7 +106,7 @@ func (i *Index) Init(ctx context.Context) error {
totalObjectsFetched += len(list.Items) totalObjectsFetched += len(list.Items)
// Index current page // Index current page
err = i.IndexBatch(ctx, list, rt.Key.Resource) err = i.IndexBatch(ctx, list)
if err != nil { if err != nil {
return err return err
} }
@ -226,6 +221,18 @@ func (i *Index) Search(ctx context.Context, tenant string, query string, limit i
return results, nil return results, nil
} }
func (i *Index) Count() (uint64, error) {
var total uint64
for _, shard := range i.shards {
count, err := shard.index.DocCount()
if err != nil {
i.log.Error("failed to get doc count", "error", err)
}
total += count
}
return total, nil
}
type Opts struct { type Opts struct {
Workers int // This controls how many goroutines are used to index objects Workers int // This controls how many goroutines are used to index objects
BatchSize int // This is the batch size for how many objects to add to the index at once BatchSize int // This is the batch size for how many objects to add to the index at once

@ -38,7 +38,7 @@ func (ir IndexedResource) FromSearchHit(hit *search.DocumentMatch) IndexedResour
ir.Title = hit.Fields["Title"].(string) ir.Title = hit.Fields["Title"].(string)
// add indexed spec fields to search results // add indexed spec fields to search results
specResult := map[string]interface{}{} specResult := map[string]any{}
for k, v := range hit.Fields { for k, v := range hit.Fields {
if strings.HasPrefix(k, "Spec.") { if strings.HasPrefix(k, "Spec.") {
specKey := strings.TrimPrefix(k, "Spec.") specKey := strings.TrimPrefix(k, "Spec.")
@ -53,8 +53,6 @@ func (ir IndexedResource) FromSearchHit(hit *search.DocumentMatch) IndexedResour
// NewIndexedResource creates a new IndexedResource from a raw resource. // NewIndexedResource creates a new IndexedResource from a raw resource.
// rawResource is the raw json for the resource from unified storage. // rawResource is the raw json for the resource from unified storage.
func NewIndexedResource(rawResource []byte) (*IndexedResource, error) { func NewIndexedResource(rawResource []byte) (*IndexedResource, error) {
ir := &IndexedResource{}
k8sObj := unstructured.Unstructured{} k8sObj := unstructured.Unstructured{}
err := k8sObj.UnmarshalJSON(rawResource) err := k8sObj.UnmarshalJSON(rawResource)
if err != nil { if err != nil {
@ -66,6 +64,7 @@ func NewIndexedResource(rawResource []byte) (*IndexedResource, error) {
return nil, err return nil, err
} }
ir := &IndexedResource{}
ir.Uid = string(meta.GetUID()) ir.Uid = string(meta.GetUID())
ir.Name = meta.GetName() ir.Name = meta.GetName()
ir.Title = meta.FindTitle("") ir.Title = meta.FindTitle("")

@ -0,0 +1,112 @@
package resource
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/rand"
)
func TestIndexBatch(t *testing.T) {
tracingCfg := tracing.NewEmptyTracingConfig()
trace, err := tracing.ProvideService(tracingCfg)
if err != nil {
t.Fatal(err)
return
}
tmpdir := os.TempDir() + "testindexbatch"
defer func() {
err := os.RemoveAll(tmpdir)
if err != nil {
t.Fatal(err)
return
}
}()
index := &Index{
tracer: trace,
shards: make(map[string]Shard),
path: tmpdir,
log: log.New("unifiedstorage.search.index"),
}
ctx := context.Background()
startAll := time.Now()
// simulate 10 List calls
for i := 0; i < 10; i++ {
list := &ListResponse{Items: loadTestItems(strconv.Itoa(i))}
start := time.Now()
err = index.IndexBatch(ctx, list)
if err != nil {
t.Fatal(err)
return
}
elapsed := time.Since(start)
fmt.Println("Time elapsed:", elapsed)
}
elapsed := time.Since(startAll)
fmt.Println("Total Time elapsed:", elapsed)
assert.Equal(t, 3, len(index.shards))
total, err := index.Count()
if err != nil {
t.Fatal(err)
return
}
assert.Equal(t, uint64(100000), total)
}
func loadTestItems(uid string) []*ResourceWrapper {
resource := `{
"kind": "<kind>",
"title": "test",
"metadata": {
"uid": "<uid>",
"name": "test",
"namespace": "<ns>"
},
"spec": {
"title": "test",
"description": "test",
"interval": "5m"
}
}`
items := []*ResourceWrapper{}
for i := 0; i < 10000; i++ {
res := strings.Replace(resource, "<uid>", strconv.Itoa(i)+uid, 1)
// shuffle kinds
kind := namespaces[rand.Intn(len(kinds))]
res = strings.Replace(res, "<kind>", kind, 1)
// shuffle namespaces
ns := namespaces[rand.Intn(len(namespaces))]
res = strings.Replace(res, "<ns>", ns, 1)
items = append(items, &ResourceWrapper{Value: []byte(res)})
}
return items
}
var namespaces = []string{
"tenant1",
"tenant2",
"tenant3",
}
var kinds = []string{
"playlist",
"folder",
}

@ -109,5 +109,9 @@ func initResourceTables(mg *migrator.Migrator) string {
Cols: []string{"group", "resource", "resource_version"}, Type: migrator.IndexType, Cols: []string{"group", "resource", "resource_version"}, Type: migrator.IndexType,
})) }))
mg.AddMigration("Add index to resource for loading", migrator.NewAddIndexMigration(resource_table, &migrator.Index{
Cols: []string{"group", "resource"}, Type: migrator.IndexType,
}))
return marker return marker
} }

Loading…
Cancel
Save