Unified storage: Reconstruct index in the background every 24h (#106422)

pull/106218/head^2
Stephanie Hingtgen 1 month ago committed by GitHub
parent 1e41c07920
commit 5135d5c87d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      pkg/setting/setting.go
  2. 2
      pkg/setting/setting_unified_storage.go
  3. 100
      pkg/storage/unified/resource/search.go
  4. 3
      pkg/storage/unified/resource/server.go
  5. 9
      pkg/storage/unified/search/options.go

@ -556,6 +556,7 @@ type Cfg struct {
IndexMaxBatchSize int
IndexFileThreshold int
IndexMinCount int
IndexRebuildInterval time.Duration
EnableSharding bool
MemberlistBindAddr string
MemberlistAdvertiseAddr string

@ -63,6 +63,8 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
cfg.InstanceID = section.Key("instance_id").String()
cfg.IndexFileThreshold = section.Key("index_file_threshold").MustInt(10)
cfg.IndexMinCount = section.Key("index_min_count").MustInt(1)
// default to 24 hours because usage insights summarizes the data every 24 hours
cfg.IndexRebuildInterval = section.Key("index_rebuild_interval").MustDuration(24 * time.Hour)
cfg.SprinklesApiServer = section.Key("sprinkles_api_server").String()
cfg.SprinklesApiServerPageLimit = section.Key("sprinkles_api_server_page_limit").MustInt(100)
cfg.CACertPath = section.Key("ca_cert_path").String()

@ -121,6 +121,9 @@ type searchSupport struct {
// testing
clientIndexEventsChan chan *IndexEvent
// periodic rebuilding of the indexes to keep usage insights up to date
rebuildInterval time.Duration
}
var (
@ -153,6 +156,7 @@ func newSearchSupport(opts SearchOptions, storage StorageBackend, access types.A
clientIndexEventsChan: opts.IndexEventsChan,
indexEventsChan: make(chan *IndexEvent),
indexQueueProcessors: make(map[string]*indexQueueProcessor),
rebuildInterval: opts.RebuildInterval,
}
info, err := opts.Resources.GetDocumentBuilders()
@ -377,34 +381,52 @@ func (s *searchSupport) GetStats(ctx context.Context, req *resourcepb.ResourceSt
return rsp, nil
}
// init is called during startup. any failure will block startup and continued execution
func (s *searchSupport) init(ctx context.Context) error {
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
defer span.End()
start := time.Now().Unix()
func (s *searchSupport) buildIndexes(ctx context.Context, rebuild bool) (int, error) {
totalBatchesIndexed := 0
group := errgroup.Group{}
group.SetLimit(s.initWorkers)
stats, err := s.storage.GetResourceStats(ctx, "", s.initMinSize)
if err != nil {
return err
return 0, err
}
for _, info := range stats {
// only periodically rebuild the dashboard index, specifically to update the usage insights data
if rebuild && info.Resource != dashboardv1.DASHBOARD_RESOURCE {
continue
}
group.Go(func() error {
s.log.Debug("initializing search index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource)
if rebuild {
// we need to clear the cache to make sure we get the latest usage insights data
s.builders.clearNamespacedCache(info.NamespacedResource)
}
s.log.Debug("building index", "namespace", info.Namespace, "group", info.Group, "resource", info.Resource)
totalBatchesIndexed++
_, _, err = s.build(ctx, info.NamespacedResource, info.Count, info.ResourceVersion)
_, _, err := s.build(ctx, info.NamespacedResource, info.Count, info.ResourceVersion)
return err
})
}
err = group.Wait()
if err != nil {
return totalBatchesIndexed, err
}
return totalBatchesIndexed, nil
}
func (s *searchSupport) init(ctx context.Context) error {
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
defer span.End()
start := time.Now().Unix()
totalBatchesIndexed, err := s.buildIndexes(ctx, false)
if err != nil {
return err
}
span.AddEvent("namespaces indexed", trace.WithAttributes(attribute.Int("namespaced_indexed", totalBatchesIndexed)))
// Now start listening for new events
@ -428,6 +450,12 @@ func (s *searchSupport) init(ctx context.Context) error {
go s.monitorIndexEvents(ctx)
// since usage insights is not in unified storage, we need to periodically rebuild the index
// to make sure these data points are up to date.
if s.rebuildInterval > 0 {
go s.startPeriodicRebuild(watchctx)
}
end := time.Now().Unix()
s.log.Info("search index initialized", "duration_secs", end-start, "total_docs", s.search.TotalDocs())
if s.indexMetrics != nil {
@ -508,6 +536,54 @@ func (s *searchSupport) monitorIndexEvents(ctx context.Context) {
}
}
func (s *searchSupport) startPeriodicRebuild(ctx context.Context) {
ticker := time.NewTicker(s.rebuildInterval)
defer ticker.Stop()
s.log.Info("starting periodic index rebuild", "interval", s.rebuildInterval)
for {
select {
case <-ctx.Done():
s.log.Info("stopping periodic index rebuild due to context cancellation")
return
case <-ticker.C:
s.log.Info("starting periodic index rebuild")
if err := s.rebuildDashboardIndexes(ctx); err != nil {
s.log.Error("error during periodic index rebuild", "error", err)
} else {
s.log.Info("periodic index rebuild completed successfully")
}
}
}
}
func (s *searchSupport) rebuildDashboardIndexes(ctx context.Context) error {
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"RebuildDashboardIndexes")
defer span.End()
start := time.Now()
s.log.Info("rebuilding all search indexes")
totalBatchesIndexed, err := s.buildIndexes(ctx, true)
if err != nil {
return fmt.Errorf("failed to rebuild dashboard indexes: %w", err)
}
end := time.Now()
duration := end.Sub(start)
s.log.Info("completed rebuilding all dashboard search indexes",
"duration", duration,
"rebuilt_indexes", totalBatchesIndexed,
"total_docs", s.search.TotalDocs())
if s.indexMetrics != nil {
s.indexMetrics.IndexCreationTime.WithLabelValues().Observe(duration.Seconds())
}
return nil
}
func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedResource) (ResourceIndex, error) {
if s == nil || s.search == nil {
return nil, fmt.Errorf("search is not configured properly (missing unifiedStorageSearch feature toggle?)")
@ -770,3 +846,9 @@ func (s *searchSupport) getOrCreateIndexQueueProcessor(index ResourceIndex, nsr
s.indexQueueProcessors[key] = indexQueueProcessor
return indexQueueProcessor, nil
}
func (s *builderCache) clearNamespacedCache(key NamespacedResource) {
s.mu.Lock()
defer s.mu.Unlock()
s.ns.Remove(key)
}

@ -158,6 +158,9 @@ type SearchOptions struct {
// Channel to watch for index events (for testing)
IndexEventsChan chan *IndexEvent
// Interval for periodic index rebuilds (0 disables periodic rebuilds)
RebuildInterval time.Duration
}
type ResourceServerOptions struct {

@ -33,10 +33,11 @@ func NewSearchOptions(features featuremgmt.FeatureToggles, cfg *setting.Cfg, tra
}
return resource.SearchOptions{
Backend: bleve,
Resources: docs,
WorkerThreads: cfg.IndexWorkers,
InitMinCount: cfg.IndexMinCount,
Backend: bleve,
Resources: docs,
WorkerThreads: cfg.IndexWorkers,
InitMinCount: cfg.IndexMinCount,
RebuildInterval: cfg.IndexRebuildInterval,
}, nil
}
return resource.SearchOptions{}, nil

Loading…
Cancel
Save