The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/services/pluginsintegration/angulardetectorsprovider/dynamic.go

274 lines
9.2 KiB

package angulardetectorsprovider
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/plugins/config"
"github.com/grafana/grafana/pkg/plugins/log"
"github.com/grafana/grafana/pkg/plugins/manager/loader/angular/angulardetector"
"github.com/grafana/grafana/pkg/services/pluginsintegration/angularpatternsstore"
)
const defaultCacheTTL = time.Hour * 1
3 years ago
// backgroundJob implements the dynamic angular detectors provider job that is periodically executed in the background.
3 years ago
type backgroundJob interface {
3 years ago
// runBackgroundJob updates the dynamic angular detectors from a source.
runBackgroundJob(ctx context.Context)
3 years ago
}
// Dynamic is an angulardetector.DetectorsProvider that calls GCOM to get Angular detection patterns,
3 years ago
// converts them to detectors and caches them for all future calls.
// It also provides a background service that will periodically refresh the patterns from GCOM.
type Dynamic struct {
3 years ago
log log.Logger
httpClient *http.Client
baseURL string
3 years ago
// store is the underlying angular patterns store used as a cache.
store angularpatternsstore.Service
// initialRestoreDone is a channel that will be closed when the first restore from db is done by the
// background service. It can be used to wait for the first restore to be done by reading a value from this channel.
initialRestoreDone chan struct{}
3 years ago
// detectors contains the cached angular detectors, which are created from the remote angular patterns.
// Use setDetectors and ProvideDetectors to write/read this value.
detectors []angulardetector.AngularDetector
3 years ago
3 years ago
// mux is the mutex used to read/write the cached detectors in a concurrency-safe way.
mux sync.RWMutex
// backgroundJob is the implementation of the background job. This is periodically invoked by Run().
backgroundJob backgroundJob
// backgroundJobInterval is the interval between the periodic background job calls.
backgroundJobInterval time.Duration
}
3 years ago
func ProvideDynamic(cfg *config.Cfg, store angularpatternsstore.Service) (*Dynamic, error) {
// TODO: standardize gcom client
cl, err := httpclient.New()
if err != nil {
return nil, fmt.Errorf("httpclient new: %w", err)
}
3 years ago
d := &Dynamic{
3 years ago
log: log.New("plugins.angulardetector.gcom"),
store: store,
httpClient: cl,
baseURL: cfg.GrafanaComURL,
backgroundJobInterval: defaultCacheTTL,
initialRestoreDone: make(chan struct{}),
3 years ago
}
3 years ago
// By default, use ourselves as backgroundJob
d.backgroundJob = d
3 years ago
return d, nil
}
3 years ago
// patternsToDetectors converts a slice of gcomPattern into a slice of angulardetector.AngularDetector, by calling
// angularDetector() on each gcomPattern.
func (d *Dynamic) patternsToDetectors(patterns GCOMPatterns) ([]angulardetector.AngularDetector, error) {
var finalErr error
detectors := make([]angulardetector.AngularDetector, 0, len(patterns))
for _, pattern := range patterns {
ad, err := pattern.angularDetector()
if err != nil {
// Fail silently in case of an errUnknownPatternType.
// This allows us to introduce new pattern types without breaking old Grafana versions
if errors.Is(err, errUnknownPatternType) {
d.log.Debug("Unknown angular pattern", "name", pattern.Name, "type", pattern.Type, "error", err)
continue
}
// Other error, do not ignore it
finalErr = errors.Join(finalErr, err)
}
detectors = append(detectors, ad)
}
if finalErr != nil {
return nil, finalErr
}
return detectors, nil
}
// fetch fetches the angular patterns from GCOM and returns them as GCOMPatterns.
// Call detectors() on the returned value to get the corresponding detectors.
func (d *Dynamic) fetch(ctx context.Context) (GCOMPatterns, error) {
st := time.Now()
reqURL, err := url.JoinPath(d.baseURL, gcomAngularPatternsPath)
if err != nil {
return nil, fmt.Errorf("url joinpath: %w", err)
}
d.log.Debug("Fetching dynamic angular detection patterns", "url", reqURL)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
return nil, fmt.Errorf("new request with context: %w", err)
}
resp, err := d.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http do: %w", err)
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
d.log.Error("Response body close error", "error", err)
}
}()
var out GCOMPatterns
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("json decode: %w", err)
}
d.log.Debug("Fetched dynamic angular detection patterns", "patterns", len(out), "duration", time.Since(st))
return out, nil
}
// fetchAndStoreDetectors fetches the patterns from GCOM, converts them into detectors, stores the new patterns into
// the store and returns the detectors. If the patterns cannot be converted to detectors, the store is not altered.
// The function returns the resulting detectors.
func (d *Dynamic) fetchAndStoreDetectors(ctx context.Context) ([]angulardetector.AngularDetector, error) {
// Fetch patterns from GCOM
patterns, err := d.fetch(ctx)
if err != nil {
return nil, fmt.Errorf("fetch: %w", err)
}
// Convert the patterns to detectors
3 years ago
newDetectors, err := d.patternsToDetectors(patterns)
if err != nil {
return nil, fmt.Errorf("patterns convert to detectors: %w", err)
}
// Update store only if the patterns can be converted to detectors
if err := d.store.Set(ctx, patterns); err != nil {
return nil, fmt.Errorf("store set: %w", err)
}
// Return the new detectors
return newDetectors, nil
}
3 years ago
// setDetectors sets the detectors by acquiring the lock first.
func (d *Dynamic) setDetectors(newDetectors []angulardetector.AngularDetector) {
d.mux.Lock()
d.detectors = newDetectors
d.mux.Unlock()
}
// tryUpdateDetectors will attempt to fetch the patterns from GCOM, convert them to detectors,
// store the patterns in the database and update the cached detectors.
func (d *Dynamic) tryUpdateDetectors(ctx context.Context) {
st := time.Now()
d.log.Debug("Updating patterns")
defer func() {
d.log.Debug("Patterns update finished", "duration", time.Since(st))
}()
opCtx, canc := context.WithTimeout(ctx, time.Minute*1)
defer canc()
// Fetch new patterns from GCOM, store response in db and get the corresponding detectors
newDetectors, err := d.fetchAndStoreDetectors(opCtx)
if err != nil {
d.log.Error("error while updating patterns", "error", err)
return
}
// Update cached detectors
d.setDetectors(newDetectors)
}
3 years ago
// backgroundJob is the function executed periodically in the background by the background service.
// It calls tryUpdateDetectors.
3 years ago
func (d *Dynamic) runBackgroundJob(ctx context.Context) {
3 years ago
d.tryUpdateDetectors(ctx)
}
// setDetectorsFromCache sets the in-memory detectors from the patterns in the store.
func (d *Dynamic) setDetectorsFromCache(ctx context.Context) error {
var cachedPatterns GCOMPatterns
rawCached, err := d.store.Get(ctx)
switch {
case errors.Is(err, angularpatternsstore.ErrNoCachedValue):
// Swallow ErrNoCachedValue without changing cache
return nil
case err == nil:
3 years ago
// Try to unmarshal, convert to detectors and set local cache
if err := json.Unmarshal([]byte(rawCached), &cachedPatterns); err != nil {
return fmt.Errorf("json unmarshal: %w", err)
}
3 years ago
cachedDetectors, err := d.patternsToDetectors(cachedPatterns)
if err != nil {
return fmt.Errorf("convert to detectors: %w", err)
}
d.setDetectors(cachedDetectors)
return nil
default:
// Other error
return fmt.Errorf("get cached value: %w", err)
}
}
3 years ago
// notifyInitialRestoreDone sets the initial restore as "done", and will unblock all goroutines waiting
// for the initial restore to be completed.
3 years ago
func (d *Dynamic) notifyInitialRestoreDone() {
close(d.initialRestoreDone)
}
// Run is the function implementing the background service and updates the detectors periodically.
func (d *Dynamic) Run(ctx context.Context) error {
// Set initial value by restoring cache
opCtx, canc := context.WithTimeout(ctx, time.Minute*1)
if err := d.setDetectorsFromCache(opCtx); err != nil {
d.log.Warn("Could not set detectors from cache, ignoring cache", "error", err)
}
canc()
3 years ago
d.notifyInitialRestoreDone()
// Determine when next run is, and check if we should run immediately
lastUpdate, err := d.store.GetLastUpdated(ctx)
if err != nil {
return fmt.Errorf("get last updated: %w", err)
}
3 years ago
nextRunUntil := time.Until(lastUpdate.Add(d.backgroundJobInterval))
if nextRunUntil <= 0 {
3 years ago
// Do first run immediately
3 years ago
d.backgroundJob.runBackgroundJob(ctx)
nextRunUntil = d.backgroundJobInterval
}
// Keep running periodically
ticker := time.NewTicker(nextRunUntil)
defer ticker.Stop()
for {
select {
case <-ticker.C:
3 years ago
d.backgroundJob.runBackgroundJob(ctx)
// Restore default TTL if we run with a shorter interval the first time
3 years ago
ticker.Reset(d.backgroundJobInterval)
case <-ctx.Done():
return ctx.Err()
}
}
}
// ProvideDetectors returns the cached detectors. It returns an empty slice if there's no value.
func (d *Dynamic) ProvideDetectors() []angulardetector.AngularDetector {
3 years ago
// Block until channel is closed, which is done after the restore from db is done.
<-d.initialRestoreDone
d.mux.RLock()
r := d.detectors
d.mux.RUnlock()
return r
}