diff --git a/pkg/services/searchV2/bluge.go b/pkg/services/searchV2/bluge.go index c546cf00565..7c50005fc9f 100644 --- a/pkg/services/searchV2/bluge.go +++ b/pkg/services/searchV2/bluge.go @@ -42,11 +42,31 @@ func initIndex(dashboards []dashboard, logger log.Logger) (*bluge.Reader, *bluge } // Not closing Writer here since we use it later while processing dashboard change events. - batch := bluge.NewBatch() - start := time.Now() label := start + batch := bluge.NewBatch() + + // In order to reduce memory usage while initial indexing we are limiting + // the size of batch here. + docsInBatch := 0 + maxBatchSize := 100 + + flushIfRequired := func(force bool) error { + docsInBatch++ + needFlush := force || (maxBatchSize > 0 && docsInBatch >= maxBatchSize) + if !needFlush { + return nil + } + err := writer.Batch(batch) + if err != nil { + return err + } + docsInBatch = 0 + batch.Reset() + return nil + } + // First index the folders to construct folderIdLookup. folderIdLookup := make(map[int64]string, 50) for _, dash := range dashboards { @@ -55,6 +75,9 @@ func initIndex(dashboards []dashboard, logger log.Logger) (*bluge.Reader, *bluge } doc := getFolderDashboardDoc(dash) batch.Insert(doc) + if err := flushIfRequired(false); err != nil { + return nil, nil, err + } uid := dash.uid if uid == "" { uid = "general" @@ -71,14 +94,23 @@ func initIndex(dashboards []dashboard, logger log.Logger) (*bluge.Reader, *bluge location := folderUID doc := getNonFolderDashboardDoc(dash, location) batch.Insert(doc) + if err := flushIfRequired(false); err != nil { + return nil, nil, err + } // Index each panel in dashboard. location += "/" + dash.uid docs := getDashboardPanelDocs(dash, location) for _, panelDoc := range docs { batch.Insert(panelDoc) + if err := flushIfRequired(false); err != nil { + return nil, nil, err + } } } + if err := flushIfRequired(true); err != nil { + return nil, nil, err + } logger.Info("Finish inserting docs into batch", "elapsed", time.Since(label)) label = time.Now() diff --git a/pkg/services/searchV2/index.go b/pkg/services/searchV2/index.go index 21711a438d3..878943d74bf 100644 --- a/pkg/services/searchV2/index.go +++ b/pkg/services/searchV2/index.go @@ -4,6 +4,9 @@ import ( "bytes" "context" "fmt" + "io/ioutil" + "os" + "runtime" "strconv" "strings" "sync" @@ -79,13 +82,10 @@ func (i *dashboardIndex) run(ctx context.Context) error { lastEventID = lastEvent.Id } - // Build on start for orgID 1 but keep lazy for others. - started := time.Now() - numDashboards, err := i.buildOrgIndex(ctx, 1) + err = i.buildInitialIndex(ctx) if err != nil { - return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err) + return fmt.Errorf("can't build initial dashboard search index: %w", err) } - i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", numDashboards) for { select { @@ -111,6 +111,93 @@ func (i *dashboardIndex) run(ctx context.Context) error { } } +func (i *dashboardIndex) buildInitialIndex(ctx context.Context) error { + memCtx, memCancel := context.WithCancel(ctx) + if os.Getenv("GF_SEARCH_DEBUG") != "" { + go i.debugMemStats(memCtx, 200*time.Millisecond) + } + + // Build on start for orgID 1 but keep lazy for others. + started := time.Now() + numDashboards, err := i.buildOrgIndex(ctx, 1) + if err != nil { + memCancel() + return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err) + } + i.logger.Info("Indexing for main org finished", "mainOrgIndexElapsed", time.Since(started), "numDashboards", numDashboards) + memCancel() + + if os.Getenv("GF_SEARCH_DEBUG") != "" { + // May help to estimate size of index when introducing changes. Though it's not a direct + // match to a memory consumption, but at least make give some relative difference understanding. + // Moreover, changes in indexing can cause additional memory consumption upon initial index build + // which is not reflected here. + i.reportSizeOfIndexDiskBackup(1) + } + return nil +} + +func (i *dashboardIndex) debugMemStats(ctx context.Context, frequency time.Duration) { + var maxHeapInuse uint64 + var maxSys uint64 + + captureMemStats := func() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + if m.HeapInuse > maxHeapInuse { + maxHeapInuse = m.HeapInuse + } + if m.Sys > maxSys { + maxSys = m.Sys + } + } + + captureMemStats() + + for { + select { + case <-ctx.Done(): + i.logger.Warn("Memory stats during indexing", "maxHeapInUse", formatBytes(maxHeapInuse), "maxSys", formatBytes(maxSys)) + return + case <-time.After(frequency): + captureMemStats() + } + } +} + +func (i *dashboardIndex) reportSizeOfIndexDiskBackup(orgID int64) { + reader, _ := i.getOrgReader(orgID) + + // create a temp directory to store the index + tmpDir, err := ioutil.TempDir("", "grafana.dashboard_index") + if err != nil { + i.logger.Error("can't create temp dir", "error", err) + return + } + defer func() { + err := os.RemoveAll(tmpDir) + if err != nil { + i.logger.Error("can't remove temp dir", "error", err, "tmpDir", tmpDir) + return + } + }() + + cancel := make(chan struct{}) + err = reader.Backup(tmpDir, cancel) + if err != nil { + i.logger.Error("can't create index disk backup", "error", err) + return + } + + size, err := dirSize(tmpDir) + if err != nil { + i.logger.Error("can't calculate dir size", "error", err) + return + } + + i.logger.Warn("Size of index disk backup", "size", formatBytes(uint64(size))) +} + func (i *dashboardIndex) buildOrgIndex(ctx context.Context, orgID int64) (int, error) { started := time.Now() ctx, cancel := context.WithTimeout(ctx, time.Minute) diff --git a/pkg/services/searchV2/utils.go b/pkg/services/searchV2/utils.go new file mode 100644 index 00000000000..a5697e94940 --- /dev/null +++ b/pkg/services/searchV2/utils.go @@ -0,0 +1,39 @@ +package searchV2 + +import ( + "fmt" + "math" + "os" + "path/filepath" +) + +func dirSize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + size += info.Size() + } + return err + }) + return size, err +} + +func logN(n, b float64) float64 { + return math.Log(n) / math.Log(b) +} + +// Slightly modified function from https://github.com/dustin/go-humanize (MIT). +func formatBytes(numBytes uint64) string { + base := 1024.0 + sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"} + if numBytes < 10 { + return fmt.Sprintf("%d B", numBytes) + } + e := math.Floor(logN(float64(numBytes), base)) + suffix := sizes[int(e)] + val := math.Floor(float64(numBytes)/math.Pow(base, e)*10+0.5) / 10 + return fmt.Sprintf("%.1f %s", val, suffix) +}