Unified Storage Indexer: Uses in-memory index (#95576)

uses in-memory index
pull/95583/head
owensmallwood 7 months ago committed by GitHub
parent af1a732821
commit 50a6069532
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 51
      pkg/storage/unified/resource/index.go
  2. 46
      pkg/storage/unified/resource/index_test.go

@ -24,6 +24,13 @@ type Shard struct {
batch *bleve.Batch
}
type Opts struct {
Workers int // This controls how many goroutines are used to index objects
BatchSize int // This is the batch size for how many objects to add to the index at once
ListLimit int // This is how big the List page size is. If the response size is too large, the number of items will be limited by the server.
IndexDir string // The directory where the indexes for each tenant are stored
}
type Index struct {
shards map[string]Shard
opts Opts
@ -292,22 +299,6 @@ func (i *Index) Count() (uint64, error) {
return total, nil
}
type Opts struct {
Workers int // This controls how many goroutines are used to index objects
BatchSize int // This is the batch size for how many objects to add to the index at once
ListLimit int // This is how big the List page size is. If the response size is too large, the number of items will be limited by the server.
IndexDir string // The directory where the indexes for each tenant are stored
}
func createFileIndex(path string) (bleve.Index, string, error) {
indexPath := filepath.Join(path, uuid.New().String())
index, err := bleve.New(indexPath, createIndexMappings())
if err != nil {
golog.Fatalf("Failed to create index: %v", err)
}
return index, indexPath, err
}
func (i *Index) allTenants() []string {
tenants := make([]string, 0, len(i.shards))
for tenant := range i.shards {
@ -321,7 +312,8 @@ func (i *Index) getShard(tenant string) (Shard, error) {
if ok {
return shard, nil
}
index, path, err := createFileIndex(i.opts.IndexDir)
index, path, err := i.createIndex()
if err != nil {
return Shard{}, err
}
@ -336,6 +328,31 @@ func (i *Index) getShard(tenant string) (Shard, error) {
return shard, nil
}
func (i *Index) createIndex() (bleve.Index, string, error) {
if i.opts.IndexDir == "" {
return createInMemoryIndex()
}
return createFileIndex(i.opts.IndexDir)
}
var mappings = createIndexMappings()
// less memory intensive alternative for larger indexes with less tenants (on-prem)
func createFileIndex(path string) (bleve.Index, string, error) {
indexPath := filepath.Join(path, uuid.New().String())
index, err := bleve.New(indexPath, mappings)
if err != nil {
golog.Fatalf("Failed to create index: %v", err)
}
return index, indexPath, err
}
// faster indexing when there are many tenants with smaller batches (cloud)
func createInMemoryIndex() (bleve.Index, string, error) {
index, err := bleve.NewMemOnly(mappings)
return index, "", err
}
// TODO - fetch from api
func fetchResourceTypes() []*ListOptions {
items := []*ListOptions{}

@ -3,7 +3,6 @@ package resource
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"testing"
@ -22,33 +21,24 @@ func TestIndexBatch(t *testing.T) {
t.Fatal(err)
}
tmpdir := os.TempDir() + "testindexbatch"
defer func() {
err = os.RemoveAll(tmpdir)
if err != nil {
t.Fatal(err)
}
}()
index := &Index{
tracer: trace,
shards: make(map[string]Shard),
log: log.New("unifiedstorage.search.index"),
opts: Opts{
IndexDir: tmpdir,
ListLimit: 10000,
ListLimit: 5000,
Workers: 10,
BatchSize: 10000,
BatchSize: 1000,
},
}
ctx := context.Background()
startAll := time.Now()
ns := namespaces()
// simulate 10 List calls
for i := 0; i < 10; i++ {
list := &ListResponse{Items: loadTestItems(strconv.Itoa(i))}
list := &ListResponse{Items: loadTestItems(strconv.Itoa(i), ns)}
start := time.Now()
_, err = index.AddToBatches(ctx, list)
if err != nil {
@ -59,12 +49,15 @@ func TestIndexBatch(t *testing.T) {
}
// index all batches for each shard/tenant
err = index.IndexBatches(ctx, 1, namespaces)
err = index.IndexBatches(ctx, 1, ns)
if err != nil {
t.Fatal(err)
}
elapsed := time.Since(startAll)
fmt.Println("Total Time elapsed:", elapsed)
assert.Equal(t, 3, len(index.shards))
assert.Equal(t, len(ns), len(index.shards))
total, err := index.Count()
if err != nil {
@ -74,7 +67,7 @@ func TestIndexBatch(t *testing.T) {
assert.Equal(t, uint64(100000), total)
}
func loadTestItems(uid string) []*ResourceWrapper {
func loadTestItems(uid string, tenants []string) []*ResourceWrapper {
resource := `{
"kind": "<kind>",
"title": "test",
@ -94,23 +87,26 @@ func loadTestItems(uid string) []*ResourceWrapper {
for i := 0; i < 10000; i++ {
res := strings.Replace(resource, "<uid>", strconv.Itoa(i)+uid, 1)
// shuffle kinds
kind := namespaces[rand.Intn(len(kinds))]
kind := kinds[rand.Intn(len(kinds))]
res = strings.Replace(res, "<kind>", kind, 1)
// shuffle namespaces
ns := namespaces[rand.Intn(len(namespaces))]
ns := tenants[rand.Intn(len(tenants))]
res = strings.Replace(res, "<ns>", ns, 1)
items = append(items, &ResourceWrapper{Value: []byte(res)})
}
return items
}
var namespaces = []string{
"tenant1",
"tenant2",
"tenant3",
}
var kinds = []string{
"playlist",
"folder",
}
// simulate many tenants ( cloud )
func namespaces() []string {
ns := []string{}
for i := 0; i < 1000; i++ {
ns = append(ns, "tenant"+strconv.Itoa(i))
}
return ns
}

Loading…
Cancel
Save