diff --git a/CHANGELOG.md b/CHANGELOG.md index 109115168a..773f54a99c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5361](https://github.com/grafana/loki/pull/5361) **ctovena**: Add usage report to grafana.com. * [5289](https://github.com/grafana/loki/pull/5289) **ctovena**: Fix deduplication bug in queries when mutating labels. * [5302](https://github.com/grafana/loki/pull/5302) **MasslessParticle** Update azure blobstore client to use new sdk. * [5243](https://github.com/grafana/loki/pull/5290) **ssncferreira**: Update Promtail to support duration string formats. diff --git a/clients/pkg/promtail/targets/cloudflare/target.go b/clients/pkg/promtail/targets/cloudflare/target.go index 806eefabfd..ad097af45d 100644 --- a/clients/pkg/promtail/targets/cloudflare/target.go +++ b/clients/pkg/promtail/targets/cloudflare/target.go @@ -109,11 +109,11 @@ func (t *Target) start() { end = maxEnd } start := end.Add(-time.Duration(t.config.PullRange)) - + requests := splitRequests(start, end, t.config.Workers) // Use background context for workers as we don't want to cancel half way through. // In case of errors we stop the target, each worker has it's own retry logic. - if err := concurrency.ForEach(context.Background(), splitRequests(start, end, t.config.Workers), t.config.Workers, func(ctx context.Context, job interface{}) error { - request := job.(pullRequest) + if err := concurrency.ForEachJob(context.Background(), len(requests), t.config.Workers, func(ctx context.Context, idx int) error { + request := requests[idx] return t.pull(ctx, request.start, request.end) }); err != nil { level.Error(t.logger).Log("msg", "failed to pull logs", "err", err, "start", start, "end", end) @@ -229,9 +229,9 @@ type pullRequest struct { end time.Time } -func splitRequests(start, end time.Time, workers int) []interface{} { +func splitRequests(start, end time.Time, workers int) []pullRequest { perWorker := end.Sub(start) / time.Duration(workers) - var requests []interface{} + var requests []pullRequest for i := 0; i < workers; i++ { r := pullRequest{ start: start.Add(time.Duration(i) * perWorker), diff --git a/clients/pkg/promtail/targets/cloudflare/target_test.go b/clients/pkg/promtail/targets/cloudflare/target_test.go index 770b1be714..a06c101a99 100644 --- a/clients/pkg/promtail/targets/cloudflare/target_test.go +++ b/clients/pkg/promtail/targets/cloudflare/target_test.go @@ -251,26 +251,26 @@ func Test_splitRequests(t *testing.T) { tests := []struct { start time.Time end time.Time - want []interface{} + want []pullRequest }{ // perfectly divisible { time.Unix(0, 0), time.Unix(0, int64(time.Minute)), - []interface{}{ - pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))}, + []pullRequest{ + {start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, + {start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, + {start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))}, }, }, // not divisible { time.Unix(0, 0), time.Unix(0, int64(time.Minute+1)), - []interface{}{ - pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))}, + []pullRequest{ + {start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, + {start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, + {start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))}, }, }, } @@ -279,11 +279,11 @@ func Test_splitRequests(t *testing.T) { got := splitRequests(tt.start, tt.end, 3) if !assert.Equal(t, tt.want, got) { for i := range got { - if !assert.Equal(t, tt.want[i].(pullRequest).start, got[i].(pullRequest).start) { - t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].(pullRequest).start.UnixNano(), got[i].(pullRequest).start.UnixNano()) + if !assert.Equal(t, tt.want[i].start, got[i].start) { + t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].start.UnixNano(), got[i].start.UnixNano()) } - if !assert.Equal(t, tt.want[i].(pullRequest).end, got[i].(pullRequest).end) { - t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].(pullRequest).end.UnixNano(), got[i].(pullRequest).end.UnixNano()) + if !assert.Equal(t, tt.want[i].end, got[i].end) { + t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].end.UnixNano(), got[i].end.UnixNano()) } } } diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index c12e60bb4c..1db281a9a2 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -164,6 +164,9 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set # If a more specific configuration is given in other sections, # the related configuration within this section will be ignored. [common: ] + +# Configuration for usage report +[usage_report: ] ``` ## server @@ -2496,6 +2499,16 @@ This way, one doesn't have to replicate configuration in multiple places. [ring: ] ``` +## usage_report + +This block allow to configure usage report of Loki to grafana.com + +```yaml +# Whether or not usage report should be disabled. +# CLI flag: -usage-report.disabled +[disabled: : default = false] +``` + ### storage The common `storage` block defines a common storage to be reused by different diff --git a/go.mod b/go.mod index 5b04cf6aae..753e9f7900 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/google/go-cmp v0.5.6 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 - github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 + github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/consul/api v1.12.0 @@ -105,6 +105,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.0.0-00010101000000-000000000000 github.com/google/renameio/v2 v2.0.0 + github.com/google/uuid v1.2.0 github.com/mattn/go-ieproxy v0.0.1 github.com/xdg-go/scram v1.0.2 gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0 @@ -185,7 +186,6 @@ require ( github.com/google/go-querystring v1.0.0 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect - github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/googleapis/gnostic v0.4.1 // indirect github.com/gophercloud/gophercloud v0.24.0 // indirect diff --git a/go.sum b/go.sum index c3c220e5be..99536d2ac8 100644 --- a/go.sum +++ b/go.sum @@ -994,9 +994,8 @@ github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxeh github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww= github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -1033,8 +1032,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= -github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 h1:IXo/V2+KKLYLD724qh3uRaZgAy3BV3HdtXuSs7lb3jU= -github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= +github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 h1:R0Pw7VjouhYSS7bsMdxEidcJbCq1KUBCzPgsh7805NM= +github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0/go.mod h1:Q9WmQ9cVkrHx6g4KSl6TN+N3vEOkDZd9RtyXCHd5OPQ= github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 h1:aEOagXOTqtN9gd4jiDuP/5a81HdoJBqkVfn8WaxbsK4= github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8/go.mod h1:QAvS2C7TtQRhhv9Uf/sxD+BUhpkrPFm5jK/9MzUiDCY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2b65da90bf..563377324b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" @@ -37,7 +38,10 @@ const ( ringKey = "distributor" ) -var maxLabelCacheSize = 100000 +var ( + maxLabelCacheSize = 100000 + rfStats = usagestats.NewInt("distributor_replication_factor") +) // Config for a Distributor. type Config struct { @@ -168,6 +172,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in }), } d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor())) + rfStats.Set(int64(ingestersRing.ReplicationFactor())) servs = append(servs, d.pool) d.subservices, err = services.NewManager(servs...) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index b1b8dab657..f0e7f00c31 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" loki_util "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -90,6 +91,12 @@ var ( // 1h -> 8hr Buckets: prometheus.LinearBuckets(1, 1, 8), }) + flushedChunksStats = usagestats.NewCounter("ingester_flushed_chunks") + flushedChunksBytesStats = usagestats.NewStatistics("ingester_flushed_chunks_bytes") + flushedChunksLinesStats = usagestats.NewStatistics("ingester_flushed_chunks_lines") + flushedChunksAgeStats = usagestats.NewStatistics("ingester_flushed_chunks_age_seconds") + flushedChunksLifespanStats = usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds") + flushedChunksUtilizationStats = usagestats.NewStatistics("ingester_flushed_chunks_utilization") ) const ( @@ -382,6 +389,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP if err := i.store.Put(ctx, wireChunks); err != nil { return err } + flushedChunksStats.Inc(int64(len(wireChunks))) // Record statistics only when actual put request did not return error. sizePerTenant := chunkSizePerTenant.WithLabelValues(userID) @@ -408,7 +416,8 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) } - chunkUtilization.Observe(wc.Data.Utilization()) + utilization := wc.Data.Utilization() + chunkUtilization.Observe(utilization) chunkEntries.Observe(float64(numEntries)) chunkSize.Observe(compressedSize) sizePerTenant.Add(compressedSize) @@ -416,6 +425,12 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP firstTime, lastTime := cs[i].chunk.Bounds() chunkAge.Observe(time.Since(firstTime).Seconds()) chunkLifespan.Observe(lastTime.Sub(firstTime).Hours()) + + flushedChunksBytesStats.Record(compressedSize) + flushedChunksLinesStats.Record(float64(numEntries)) + flushedChunksUtilizationStats.Record(utilization) + flushedChunksAgeStats.Record(time.Since(firstTime).Seconds()) + flushedChunksLifespanStats.Record(lastTime.Sub(firstTime).Hours()) } return nil diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3359a61205..3d93bf454e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" errUtil "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -45,12 +46,18 @@ const ( // ErrReadOnly is returned when the ingester is shutting down and a push was // attempted. -var ErrReadOnly = errors.New("Ingester is shutting down") - -var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cortex_ingester_flush_queue_length", - Help: "The total number of series pending in the flush queue.", -}) +var ( + ErrReadOnly = errors.New("Ingester is shutting down") + + flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_flush_queue_length", + Help: "The total number of series pending in the flush queue.", + }) + compressionStats = usagestats.NewString("ingester_compression") + targetSizeStats = usagestats.NewInt("ingester_target_size_bytes") + walStats = usagestats.NewString("ingester_wal") + activeTenantsStats = usagestats.NewInt("ingester_active_tenants") +) // Config for an ingester. type Config struct { @@ -215,7 +222,12 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } - + compressionStats.Set(cfg.ChunkEncoding) + targetSizeStats.Set(int64(cfg.TargetChunkSize)) + walStats.Set("disabled") + if cfg.WAL.Enabled { + walStats.Set("enabled") + } metrics := newIngesterMetrics(registerer) i := &Ingester{ @@ -549,6 +561,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) *instance { if !ok { inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter) i.instances[instanceID] = inst + activeTenantsStats.Set(int64(len(i.instances))) } return inst } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 6bf66c3472..bf60bc9351 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/math" @@ -53,6 +54,8 @@ var ( Name: "ingester_streams_removed_total", Help: "The total number of streams removed per tenant.", }, []string{"tenant"}) + + streamsCountStats = usagestats.NewInt("ingester_streams_count") ) type instance struct { @@ -248,6 +251,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord memoryStreams.WithLabelValues(i.instanceID).Inc() i.streamsCreatedTotal.Inc() i.addTailersToNewStream(s) + streamsCountStats.Add(1) if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( @@ -288,6 +292,7 @@ func (i *instance) removeStream(s *stream) { i.index.Delete(s.labels, s.fp) i.streamsRemovedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Dec() + streamsCountStats.Add(-1) } } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 99d3a563b8..fdf9a9b3c7 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/flagext" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" @@ -47,6 +48,8 @@ var ( Buckets: prometheus.ExponentialBuckets(5, 2, 6), }) + + chunkCreatedStats = usagestats.NewCounter("ingester_chunk_created") ) var ErrEntriesExist = errors.New("duplicate push - entries already exist") @@ -203,6 +206,7 @@ func (s *stream) Push( chunk: s.NewChunk(), }) chunksCreatedTotal.Inc() + chunkCreatedStats.Inc(1) } var storedEntries []logproto.Entry @@ -379,6 +383,7 @@ func (s *stream) cutChunk(ctx context.Context) *chunkDesc { samplesPerChunk.Observe(float64(chunk.chunk.Size())) blocksPerChunk.Observe(float64(chunk.chunk.BlockCount())) chunksCreatedTotal.Inc() + chunkCreatedStats.Inc(1) s.chunks = append(s.chunks, chunkDesc{ chunk: s.NewChunk(), diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 486042eced..9ee8881f5f 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" loki_util "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/unmarshal" @@ -39,6 +40,9 @@ var ( Name: "distributor_lines_received_total", Help: "The total number of lines received per tenant", }, []string{"tenant"}) + + bytesReceivedStats = usagestats.NewCounter("distributor_bytes_received") + linesReceivedStats = usagestats.NewCounter("distributor_lines_received") ) const applicationJSON = "application/json" @@ -130,6 +134,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete totalEntries++ entriesSize += int64(len(e.Line)) bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line)))) + bytesReceivedStats.Inc(int64(len(e.Line))) if e.Timestamp.After(mostRecentEntry) { mostRecentEntry = e.Timestamp } @@ -140,6 +145,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete if totalEntries != 0 && userID != "" { linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) } + linesReceivedStats.Inc(totalEntries) level.Debug(logger).Log( "msg", "push request parsed", diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 56b82b5091..9359b56da5 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -64,6 +65,11 @@ var ( Name: "logql_querystats_ingester_sent_lines_total", Help: "Total count of lines sent from ingesters while executing LogQL queries.", }) + + bytePerSecondMetricUsage = usagestats.NewStatistics("query_metric_bytes_per_second") + bytePerSecondLogUsage = usagestats.NewStatistics("query_log_bytes_per_second") + linePerSecondMetricUsage = usagestats.NewStatistics("query_metric_lines_per_second") + linePerSecondLogUsage = usagestats.NewStatistics("query_log_lines_per_second") ) func RecordMetrics(ctx context.Context, p Params, status string, stats logql_stats.Result, result promql_parser.Value) { @@ -125,6 +131,18 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats logql_sta chunkDownloadedTotal.WithLabelValues(status, queryType, rt). Add(float64(stats.TotalChunksDownloaded())) ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) + + recordUsageStats(queryType, stats) +} + +func recordUsageStats(queryType string, stats logql_stats.Result) { + if queryType == QueryTypeMetric { + bytePerSecondMetricUsage.Record(float64(stats.Summary.BytesProcessedPerSecond)) + linePerSecondMetricUsage.Record(float64(stats.Summary.LinesProcessedPerSecond)) + } else { + bytePerSecondLogUsage.Record(float64(stats.Summary.BytesProcessedPerSecond)) + linePerSecondLogUsage.Record(float64(stats.Summary.LinesProcessedPerSecond)) + } } func QueryType(query string) (string, error) { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index bb42d3194c..9446572f3c 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -45,6 +45,7 @@ import ( chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" "github.com/grafana/loki/pkg/tracing" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/fakeauth" util_log "github.com/grafana/loki/pkg/util/log" @@ -79,6 +80,7 @@ type Config struct { Tracing tracing.Config `yaml:"tracing"` CompactorConfig compactor.Config `yaml:"compactor,omitempty"` QueryScheduler scheduler.Config `yaml:"query_scheduler"` + UsageReport usagestats.Config `yaml:"usage_report"` } // RegisterFlags registers flag. @@ -115,6 +117,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Tracing.RegisterFlags(f) c.CompactorConfig.RegisterFlags(f) c.QueryScheduler.RegisterFlags(f) + c.UsageReport.RegisterFlags(f) } func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { @@ -245,6 +248,7 @@ type Loki struct { compactor *compactor.Compactor QueryFrontEndTripperware basetripper.Tripperware queryScheduler *scheduler.Scheduler + usageReport *usagestats.Reporter clientMetrics chunk_storage.ClientMetrics @@ -481,6 +485,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(IndexGateway, t.initIndexGateway) mm.RegisterModule(QueryScheduler, t.initQueryScheduler) + mm.RegisterModule(UsageReport, t.initUsageReport) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -492,17 +497,17 @@ func (t *Loki) setupModuleManager() error { Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, - Distributor: {Ring, Server, Overrides, TenantConfigs}, + Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport}, Store: {Overrides}, - Ingester: {Store, Server, MemberlistKV, TenantConfigs}, - Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs}, + Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport}, + Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport}, QueryFrontendTripperware: {Server, Overrides, TenantConfigs}, - QueryFrontend: {QueryFrontendTripperware}, - QueryScheduler: {Server, Overrides, MemberlistKV}, - Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs}, - TableManager: {Server}, - Compactor: {Server, Overrides, MemberlistKV}, - IndexGateway: {Server}, + QueryFrontend: {QueryFrontendTripperware, UsageReport}, + QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport}, + Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, + TableManager: {Server, UsageReport}, + Compactor: {Server, Overrides, MemberlistKV, UsageReport}, + IndexGateway: {Server, UsageReport}, IngesterQuerier: {Ring}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 6f142dac55..685de4b69e 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/dskit/runtimeconfig" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/common/version" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/weaveworks/common/middleware" @@ -49,6 +50,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" serverutil "github.com/grafana/loki/pkg/util/server" @@ -82,6 +84,7 @@ const ( All string = "all" Read string = "read" Write string = "write" + UsageReport string = "usage-report" ) func (t *Loki) initServer() (services.Service, error) { @@ -749,6 +752,33 @@ func (t *Loki) initQueryScheduler() (services.Service, error) { return s, nil } +func (t *Loki) initUsageReport() (services.Service, error) { + if t.Cfg.UsageReport.Disabled { + return nil, nil + } + t.Cfg.UsageReport.Leader = false + if t.isModuleActive(Ingester) { + t.Cfg.UsageReport.Leader = true + } + usagestats.Edition("oss") + usagestats.Target(t.Cfg.Target.String()) + period, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now()) + if err != nil { + return nil, err + } + + objectClient, err := chunk_storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig.Config, t.clientMetrics) + if err != nil { + return nil, err + } + ur, err := usagestats.NewReporter(t.Cfg.UsageReport, t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore, objectClient, util_log.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + t.usageReport = ur + return ur, nil +} + func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`") diff --git a/pkg/ruler/base/ruler.go b/pkg/ruler/base/ruler.go index 46fca61517..cf8761d6f4 100644 --- a/pkg/ruler/base/ruler.go +++ b/pkg/ruler/base/ruler.go @@ -427,7 +427,7 @@ func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if r.cfg.EnableSharding { r.ring.ServeHTTP(w, req) } else { - var unshardedPage = ` + unshardedPage := ` @@ -769,9 +769,9 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta // Concurrently fetch rules from all rulers. Since rules are not replicated, // we need all requests to succeed. - jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) - err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { - addr := job.(string) + addresses := rulers.GetAddresses() + err = concurrency.ForEachJob(ctx, len(addresses), len(addresses), func(ctx context.Context, idx int) error { + addr := addresses[idx] rulerClient, err := r.clientsPool.GetClientFor(addr) if err != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 8a65211fa8..1f5ec9fac7 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" ) @@ -31,6 +32,9 @@ var ( errCurrentBoltdbShipperNon24Hours = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period") errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h") errZeroLengthConfig = errors.New("must specify at least one schema configuration") + indexTypeStats = usagestats.NewString("store_index_type") + objectTypeStats = usagestats.NewString("store_object_type") + schemaStats = usagestats.NewString("store_schema") ) // Config is the loki storage configuration @@ -125,6 +129,14 @@ type store struct { // NewStore creates a new Loki Store using configuration supplied. func NewStore(cfg Config, schemaCfg SchemaConfig, chunkStore chunk.Store, registerer prometheus.Registerer) (Store, error) { + if len(schemaCfg.Configs) != 0 { + if index := ActivePeriodConfig(schemaCfg.Configs); index != -1 && index < len(schemaCfg.Configs) { + indexTypeStats.Set(schemaCfg.Configs[index].IndexType) + objectTypeStats.Set(schemaCfg.Configs[index].ObjectType) + schemaStats.Set(schemaCfg.Configs[index].Schema) + } + } + return &store{ Store: chunkStore, cfg: cfg, diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 0aee339550..bbfeb044bf 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -52,6 +53,11 @@ const ( ringNumTokens = 1 ) +var ( + retentionEnabledStats = usagestats.NewString("compactor_retention_enabled") + defaultRetentionStats = usagestats.NewString("compactor_default_retention") +) + type Config struct { WorkingDirectory string `yaml:"working_directory"` SharedStoreType string `yaml:"shared_store"` @@ -119,6 +125,13 @@ type Compactor struct { } func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, clientMetrics storage.ClientMetrics, r prometheus.Registerer) (*Compactor, error) { + retentionEnabledStats.Set("false") + if cfg.RetentionEnabled { + retentionEnabledStats.Set("true") + } + if limits != nil { + defaultRetentionStats.Set(limits.DefaultLimits().RetentionPeriod.String()) + } if cfg.SharedStoreType == "" { return nil, errors.New("compactor shared_store_type must be specified") } diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index b9ab02611c..c1c5434725 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -264,18 +264,13 @@ func (t *table) compactFiles(files []storage.IndexFile) error { return err } - jobs := make([]interface{}, len(files)) - for i := 0; i < len(files); i++ { - jobs[i] = i - } - - return concurrency.ForEach(t.ctx, jobs, readDBsConcurrency, func(ctx context.Context, job interface{}) error { - workNum := job.(int) + return concurrency.ForEachJob(t.ctx, len(files), readDBsConcurrency, func(ctx context.Context, idx int) error { + workNum := idx // skip seed file if workNum == t.seedSourceFileIdx { return nil } - fileName := files[workNum].Name + fileName := files[idx].Name downloadAt := filepath.Join(t.workingDirectory, fileName) err = shipper_util.DownloadFileFromStorage(downloadAt, shipper_util.IsCompressedFile(fileName), diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index 9672f2eb48..3c9d0f8c32 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -390,13 +390,8 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind downloadedFiles := make([]string, 0, len(files)) downloadedFilesMtx := sync.Mutex{} - jobs := make([]interface{}, len(files)) - for i := 0; i < len(files); i++ { - jobs[i] = i - } - - err := concurrency.ForEach(ctx, jobs, maxDownloadConcurrency, func(ctx context.Context, job interface{}) error { - fileName := files[job.(int)].Name + err := concurrency.ForEachJob(ctx, len(files), maxDownloadConcurrency, func(ctx context.Context, idx int) error { + fileName := files[idx].Name err := t.downloadFileFromStorage(ctx, fileName, t.cacheLocation) if err != nil { if t.baseIndexSet.IsFileNotFoundErr(err) { @@ -412,7 +407,6 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind return nil }) - if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index ec334eb63a..ca427a633c 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -308,8 +308,8 @@ func (t *Table) EnsureQueryReadiness(ctx context.Context) error { // downloadUserIndexes downloads user specific index files concurrently. func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error { - return concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), maxDownloadConcurrency, func(ctx context.Context, userID interface{}) error { - indexSet, err := t.getOrCreateIndexSet(userID.(string)) + return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error { + indexSet, err := t.getOrCreateIndexSet(userIDs[idx]) if err != nil { return err } diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go new file mode 100644 index 0000000000..af57c163f8 --- /dev/null +++ b/pkg/usagestats/reporter.go @@ -0,0 +1,266 @@ +package usagestats + +import ( + "bytes" + "context" + "flag" + "io" + "math" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/google/uuid" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/util/build" +) + +const ( + // File name for the cluster seed file. + ClusterSeedFileName = "loki_cluster_seed.json" + // attemptNumber how many times we will try to read a corrupted cluster seed before deleting it. + attemptNumber = 4 + // seedKey is the key for the cluster seed to use with the kv store. + seedKey = "usagestats_token" +) + +var ( + reportCheckInterval = time.Minute + reportInterval = 1 * time.Hour +) + +type Config struct { + Disabled bool `yaml:"disabled"` + Leader bool `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.Disabled, "usage-report.disabled", false, "Disable anonymous usage reporting.") +} + +type Reporter struct { + kvClient kv.Client + logger log.Logger + objectClient chunk.ObjectClient + reg prometheus.Registerer + + services.Service + + conf Config + cluster *ClusterSeed + lastReport time.Time +} + +func NewReporter(config Config, kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer) (*Reporter, error) { + if config.Disabled { + return nil, nil + } + kvClient, err := kv.NewClient(kvConfig, JSONCodec, kv.RegistererWithKVName(reg, "usagestats"), logger) + if err != nil { + return nil, err + } + r := &Reporter{ + kvClient: kvClient, + logger: logger, + objectClient: objectClient, + conf: config, + reg: reg, + } + r.Service = services.NewBasicService(nil, r.running, nil) + return r, nil +} + +func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { + // Try to become leader via the kv client + for backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }); ; backoff.Ongoing() { + // create a new cluster seed + seed := ClusterSeed{ + UID: uuid.NewString(), + PrometheusVersion: build.GetVersion(), + CreatedAt: time.Now(), + } + if err := rep.kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { + // The key is already set, so we don't need to do anything + if in != nil { + if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID { + seed = *kvSeed + return nil, false, nil + } + } + return seed, true, nil + }); err != nil { + level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + continue + } + // Fetch the remote cluster seed. + remoteSeed, err := rep.fetchSeed(ctx, + func(err error) bool { + // we only want to retry if the error is not an object not found error + return !rep.objectClient.IsObjectNotFoundErr(err) + }) + if err != nil { + if rep.objectClient.IsObjectNotFoundErr(err) { + // we are the leader and we need to save the file. + if err := rep.writeSeedFile(ctx, seed); err != nil { + level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + continue + } + return &seed + } + continue + } + return remoteSeed + } +} + +func (rep *Reporter) init(ctx context.Context) { + if rep.conf.Leader { + rep.cluster = rep.initLeader(ctx) + return + } + // follower only wait for the cluster seed to be set. + // it will try forever to fetch the cluster seed. + seed, _ := rep.fetchSeed(ctx, nil) + rep.cluster = seed +} + +// fetchSeed fetches the cluster seed from the object store and try until it succeeds. +// continueFn allow you to decide if we should continue retrying. Nil means always retry +func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) bool) (*ClusterSeed, error) { + var ( + backoff = backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }) + readingErr = 0 + ) + for backoff.Ongoing() { + seed, err := rep.readSeedFile(ctx) + if err != nil { + if !rep.objectClient.IsObjectNotFoundErr(err) { + readingErr++ + } + level.Debug(rep.logger).Log("msg", "failed to read cluster seed file", "err", err) + if readingErr > attemptNumber { + if err := rep.objectClient.DeleteObject(ctx, ClusterSeedFileName); err != nil { + level.Error(rep.logger).Log("msg", "failed to delete corrupted cluster seed file, deleting it", "err", err) + } + readingErr = 0 + } + if continueFn == nil || continueFn(err) { + continue + } + return nil, err + } + return seed, nil + } + return nil, backoff.Err() +} + +// readSeedFile reads the cluster seed file from the object store. +func (rep *Reporter) readSeedFile(ctx context.Context) (*ClusterSeed, error) { + reader, _, err := rep.objectClient.GetObject(ctx, ClusterSeedFileName) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + defer func() { + if err := reader.Close(); err != nil { + level.Error(rep.logger).Log("msg", "failed to close reader", "err", err) + } + }() + data, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + seed, err := JSONCodec.Decode(data) + if err != nil { + return nil, err + } + return seed.(*ClusterSeed), nil +} + +// writeSeedFile writes the cluster seed to the object store. +func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error { + data, err := JSONCodec.Encode(seed) + if err != nil { + return err + } + return rep.objectClient.PutObject(ctx, ClusterSeedFileName, bytes.NewReader(data)) +} + +// running inits the reporter seed and start sending report for every interval +func (rep *Reporter) running(ctx context.Context) error { + rep.init(ctx) + + // check every minute if we should report. + ticker := time.NewTicker(reportCheckInterval) + defer ticker.Stop() + + // find when to send the next report. + next := nextReport(reportInterval, rep.cluster.CreatedAt, time.Now()) + if rep.lastReport.IsZero() { + // if we never reported assumed it was the last interval. + rep.lastReport = next.Add(-reportInterval) + } + for { + select { + case <-ticker.C: + now := time.Now() + if !next.Equal(now) && now.Sub(rep.lastReport) < reportInterval { + continue + } + level.Debug(rep.logger).Log("msg", "reporting cluster stats", "date", time.Now()) + if err := rep.reportUsage(ctx, next); err != nil { + level.Info(rep.logger).Log("msg", "failed to report usage", "err", err) + continue + } + rep.lastReport = next + next = next.Add(reportInterval) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// reportUsage reports the usage to grafana.com. +func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error { + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: 30 * time.Second, + MaxRetries: 5, + }) + var errs multierror.MultiError + for backoff.Ongoing() { + if err := sendReport(ctx, rep.cluster, interval); err != nil { + level.Info(rep.logger).Log("msg", "failed to send usage report", "retries", backoff.NumRetries(), "err", err) + errs.Add(err) + backoff.Wait() + continue + } + level.Debug(rep.logger).Log("msg", "usage report sent with success") + return nil + } + return errs.Err() +} + +// nextReport compute the next report time based on the interval. +// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time. +func nextReport(interval time.Duration, createdAt, now time.Time) time.Time { + // createdAt * (x * interval ) >= now + return createdAt.Add(time.Duration(math.Ceil(float64(now.Sub(createdAt))/float64(interval))) * interval) +} diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go new file mode 100644 index 0000000000..c3dd63c2af --- /dev/null +++ b/pkg/usagestats/reporter_test.go @@ -0,0 +1,147 @@ +package usagestats + +import ( + "context" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/kv" + jsoniter "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/storage" +) + +var metrics = storage.NewClientMetrics() + +func Test_LeaderElection(t *testing.T) { + result := make(chan *ClusterSeed, 10) + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ + FSConfig: local.FSConfig{ + Directory: t.TempDir(), + }, + }, metrics) + require.NoError(t, err) + for i := 0; i < 3; i++ { + go func() { + r, err := NewReporter(Config{Leader: true}, kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + r.init(context.Background()) + result <- r.cluster + }() + } + for i := 0; i < 7; i++ { + go func() { + r, err := NewReporter(Config{Leader: false}, kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + r.init(context.Background()) + result <- r.cluster + }() + } + + var UID []string + for i := 0; i < 10; i++ { + cluster := <-result + require.NotNil(t, cluster) + UID = append(UID, cluster.UID) + } + first := UID[0] + for _, uid := range UID { + require.Equal(t, first, uid) + } + kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, JSONCodec, prometheus.DefaultRegisterer, log.NewLogfmtLogger(os.Stdout)) + require.NoError(t, err) + // verify that the ID found is also correctly stored in the kv store and not overridden by another leader. + data, err := kvClient.Get(context.Background(), seedKey) + require.NoError(t, err) + require.Equal(t, data.(*ClusterSeed).UID, first) +} + +func Test_ReportLoop(t *testing.T) { + // stub + reportCheckInterval = 100 * time.Millisecond + reportInterval = time.Second + + totalReport := 0 + clusterIDs := []string{} + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + var received Report + totalReport++ + require.NoError(t, jsoniter.NewDecoder(r.Body).Decode(&received)) + clusterIDs = append(clusterIDs, received.ClusterID) + rw.WriteHeader(http.StatusOK) + })) + usageStatsURL = server.URL + + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ + FSConfig: local.FSConfig{ + Directory: t.TempDir(), + }, + }, metrics) + require.NoError(t, err) + + r, err := NewReporter(Config{Leader: true}, kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + + r.initLeader(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + <-time.After(6 * time.Second) + cancel() + }() + require.Equal(t, context.Canceled, r.running(ctx)) + require.GreaterOrEqual(t, totalReport, 5) + first := clusterIDs[0] + for _, uid := range clusterIDs { + require.Equal(t, first, uid) + } + require.Equal(t, first, r.cluster.UID) +} + +func Test_NextReport(t *testing.T) { + fixtures := map[string]struct { + interval time.Duration + createdAt time.Time + now time.Time + + next time.Time + }{ + "createdAt aligned with interval and now": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()), + next: time.Unix(0, 2*time.Hour.Nanoseconds()), + }, + "createdAt aligned with interval": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()+1), + next: time.Unix(0, 3*time.Hour.Nanoseconds()), + }, + "createdAt not aligned": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()+1), + next: time.Unix(0, 2*time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()), + }, + } + for name, f := range fixtures { + t.Run(name, func(t *testing.T) { + next := nextReport(f.interval, f.createdAt, f.now) + require.Equal(t, f.next, next) + }) + } +} diff --git a/pkg/usagestats/seed.go b/pkg/usagestats/seed.go new file mode 100644 index 0000000000..cbf4108c44 --- /dev/null +++ b/pkg/usagestats/seed.go @@ -0,0 +1,33 @@ +package usagestats + +import ( + "time" + + jsoniter "github.com/json-iterator/go" + prom "github.com/prometheus/prometheus/web/api/v1" +) + +type ClusterSeed struct { + UID string `json:"UID"` + CreatedAt time.Time `json:"created_at"` + prom.PrometheusVersion `json:"version"` +} + +var JSONCodec = jsonCodec{} + +type jsonCodec struct{} + +// todo we need to use the default codec for the rest of the code +// currently crashing because the in-memory kvstore use a singleton. +func (jsonCodec) Decode(data []byte) (interface{}, error) { + var seed ClusterSeed + if err := jsoniter.ConfigFastest.Unmarshal(data, &seed); err != nil { + return nil, err + } + return &seed, nil +} + +func (jsonCodec) Encode(obj interface{}) ([]byte, error) { + return jsoniter.ConfigFastest.Marshal(obj) +} +func (jsonCodec) CodecID() string { return "usagestats.jsonCodec" } diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go new file mode 100644 index 0000000000..89218f6f6f --- /dev/null +++ b/pkg/usagestats/stats.go @@ -0,0 +1,352 @@ +package usagestats + +import ( + "bytes" + "context" + "encoding/json" + "expvar" + "fmt" + "io" + "math" + "net/http" + "runtime" + "strings" + "sync" + "time" + + "github.com/grafana/loki/pkg/util/build" + + "github.com/cespare/xxhash/v2" + jsoniter "github.com/json-iterator/go" + prom "github.com/prometheus/prometheus/web/api/v1" + "go.uber.org/atomic" +) + +var ( + httpClient = http.Client{Timeout: 5 * time.Second} + usageStatsURL = "https://stats.grafana.org/loki-usage-report" + statsPrefix = "github.com/grafana/loki/" + targetKey = "target" + editionKey = "edition" +) + +// Report is the JSON object sent to the stats server +type Report struct { + ClusterID string `json:"clusterID"` + CreatedAt time.Time `json:"createdAt"` + Interval time.Time `json:"interval"` + Target string `json:"target"` + prom.PrometheusVersion `json:"version"` + Os string `json:"os"` + Arch string `json:"arch"` + Edition string `json:"edition"` + Metrics map[string]interface{} `json:"metrics"` +} + +// sendReport sends the report to the stats server +func sendReport(ctx context.Context, seed *ClusterSeed, interval time.Time) error { + report := buildReport(seed, interval) + out, err := jsoniter.MarshalIndent(report, "", " ") + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, usageStatsURL, bytes.NewBuffer(out)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + data, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("failed to send usage stats: %s body: %s", resp.Status, string(data)) + } + return nil +} + +// buildReport builds the report to be sent to the stats server +func buildReport(seed *ClusterSeed, interval time.Time) Report { + var ( + targetName string + editionName string + ) + if target := expvar.Get(statsPrefix + targetKey); target != nil { + if target, ok := target.(*expvar.String); ok { + targetName = target.Value() + } + } + if edition := expvar.Get(statsPrefix + editionKey); edition != nil { + if edition, ok := edition.(*expvar.String); ok { + editionName = edition.Value() + } + } + + return Report{ + ClusterID: seed.UID, + PrometheusVersion: build.GetVersion(), + CreatedAt: seed.CreatedAt, + Interval: interval, + Os: runtime.GOOS, + Arch: runtime.GOARCH, + Target: targetName, + Edition: editionName, + Metrics: buildMetrics(), + } +} + +// buildMetrics builds the metrics part of the report to be sent to the stats server +func buildMetrics() map[string]interface{} { + result := map[string]interface{}{ + "memstats": memstats(), + "num_cpu": runtime.NumCPU(), + "num_goroutine": runtime.NumGoroutine(), + } + expvar.Do(func(kv expvar.KeyValue) { + if !strings.HasPrefix(kv.Key, statsPrefix) || kv.Key == statsPrefix+targetKey || kv.Key == statsPrefix+editionKey { + return + } + var value interface{} + switch v := kv.Value.(type) { + case *expvar.Int: + value = v.Value() + case *expvar.Float: + value = v.Value() + case *expvar.String: + value = v.Value() + case *Statistics: + value = v.Value() + case *Counter: + v.updateRate() + value = v.Value() + v.reset() + case *WordCounter: + value = v.Value() + default: + value = v.String() + } + result[strings.TrimPrefix(kv.Key, statsPrefix)] = value + }) + return result +} + +func memstats() interface{} { + stats := new(runtime.MemStats) + runtime.ReadMemStats(stats) + return map[string]interface{}{ + "alloc": stats.Alloc, + "total_alloc": stats.TotalAlloc, + "sys": stats.Sys, + "heap_alloc": stats.HeapAlloc, + "heap_inuse": stats.HeapInuse, + "stack_inuse": stats.StackInuse, + "pause_total_ns": stats.PauseTotalNs, + "num_gc": stats.NumGC, + "gc_cpu_fraction": stats.GCCPUFraction, + } +} + +// NewFloat returns a new Float stats object. +func NewFloat(name string) *expvar.Float { + return expvar.NewFloat(statsPrefix + name) +} + +// NewInt returns a new Int stats object. +func NewInt(name string) *expvar.Int { + return expvar.NewInt(statsPrefix + name) +} + +// NewString returns a new String stats object. +func NewString(name string) *expvar.String { + return expvar.NewString(statsPrefix + name) +} + +// Target sets the target name. +func Target(target string) { + NewString(targetKey).Set(target) +} + +// Edition sets the edition name. +func Edition(edition string) { + NewString(editionKey).Set(edition) +} + +type Statistics struct { + min *atomic.Float64 + max *atomic.Float64 + count *atomic.Int64 + + avg *atomic.Float64 + + // require for stddev and stdvar + mean *atomic.Float64 + value *atomic.Float64 +} + +// NewStatistics returns a new Statistics object. +// Statistics object is thread-safe and compute statistics on the fly based on sample recorded. +// Available statistics are: +// - min +// - max +// - avg +// - count +// - stddev +// - stdvar +func NewStatistics(name string) *Statistics { + s := &Statistics{ + min: atomic.NewFloat64(math.Inf(0)), + max: atomic.NewFloat64(math.Inf(-1)), + count: atomic.NewInt64(0), + avg: atomic.NewFloat64(0), + mean: atomic.NewFloat64(0), + value: atomic.NewFloat64(0), + } + expvar.Publish(statsPrefix+name, s) + return s +} + +func (s *Statistics) String() string { + b, _ := json.Marshal(s.Value()) + return string(b) +} + +func (s *Statistics) Value() map[string]interface{} { + stdvar := s.value.Load() / float64(s.count.Load()) + stddev := math.Sqrt(stdvar) + min := s.min.Load() + max := s.max.Load() + result := map[string]interface{}{ + "avg": s.avg.Load(), + "count": s.count.Load(), + } + if !math.IsInf(min, 0) { + result["min"] = min + } + if !math.IsInf(max, 0) { + result["max"] = s.max.Load() + } + if !math.IsNaN(stddev) { + result["stddev"] = stddev + } + if !math.IsNaN(stdvar) { + result["stdvar"] = stdvar + } + return result +} + +func (s *Statistics) Record(v float64) { + for { + min := s.min.Load() + if min <= v { + break + } + if s.min.CAS(min, v) { + break + } + } + for { + max := s.max.Load() + if max >= v { + break + } + if s.max.CAS(max, v) { + break + } + } + for { + avg := s.avg.Load() + count := s.count.Load() + mean := s.mean.Load() + value := s.value.Load() + + delta := v - mean + newCount := count + 1 + newMean := mean + (delta / float64(newCount)) + newValue := value + (delta * (v - newMean)) + newAvg := avg + ((v - avg) / float64(newCount)) + if s.avg.CAS(avg, newAvg) && s.count.CAS(count, newCount) && s.mean.CAS(mean, newMean) && s.value.CAS(value, newValue) { + break + } + } +} + +type Counter struct { + total *atomic.Int64 + rate *atomic.Float64 + + resetTime time.Time +} + +// NewCounter returns a new Counter stats object. +func NewCounter(name string) *Counter { + c := &Counter{ + total: atomic.NewInt64(0), + rate: atomic.NewFloat64(0), + resetTime: time.Now(), + } + expvar.Publish(statsPrefix+name, c) + return c +} + +func (c *Counter) updateRate() { + total := c.total.Load() + c.rate.Store(float64(total) / time.Since(c.resetTime).Seconds()) +} + +func (c *Counter) reset() { + c.total.Store(0) + c.rate.Store(0) + c.resetTime = time.Now() +} + +func (c *Counter) Inc(i int64) { + c.total.Add(i) +} + +func (c *Counter) String() string { + b, _ := json.Marshal(c.Value()) + return string(b) +} + +func (c *Counter) Value() map[string]interface{} { + return map[string]interface{}{ + "total": c.total.Load(), + "rate": c.rate.Load(), + } +} + +type WordCounter struct { + words sync.Map + count *atomic.Int64 +} + +// NewWordCounter returns a new WordCounter stats object. +// WordCounter object is thread-safe and count the amount of word recorded. +func NewWordCounter(name string) *WordCounter { + c := &WordCounter{ + count: atomic.NewInt64(0), + words: sync.Map{}, + } + expvar.Publish(statsPrefix+name, c) + return c +} + +func (w *WordCounter) Add(word string) { + if _, loaded := w.words.LoadOrStore(xxhash.Sum64String(word), struct{}{}); !loaded { + w.count.Add(1) + } +} + +func (w *WordCounter) String() string { + b, _ := json.Marshal(w.Value()) + return string(b) +} + +func (w *WordCounter) Value() int64 { + return w.count.Load() +} diff --git a/pkg/usagestats/stats_test.go b/pkg/usagestats/stats_test.go new file mode 100644 index 0000000000..de5f5005b4 --- /dev/null +++ b/pkg/usagestats/stats_test.go @@ -0,0 +1,97 @@ +package usagestats + +import ( + "runtime" + "testing" + "time" + + "github.com/grafana/loki/pkg/util/build" + + "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/require" +) + +func Test_BuildReport(t *testing.T) { + now := time.Now() + seed := &ClusterSeed{ + UID: uuid.New().String(), + CreatedAt: now, + } + + Edition("OSS") + Target("compactor") + NewString("compression").Set("lz4") + NewInt("compression_ratio").Set(100) + NewFloat("size_mb").Set(100.1) + NewCounter("lines_written").Inc(200) + s := NewStatistics("query_throughput") + s.Record(300) + s.Record(5) + w := NewWordCounter("active_tenants") + w.Add("foo") + w.Add("bar") + w.Add("foo") + + r := buildReport(seed, now.Add(time.Hour)) + require.Equal(t, r.Arch, runtime.GOARCH) + require.Equal(t, r.Os, runtime.GOOS) + require.Equal(t, r.PrometheusVersion, build.GetVersion()) + require.Equal(t, r.Edition, "OSS") + require.Equal(t, r.Target, "compactor") + require.Equal(t, r.Metrics["num_cpu"], runtime.NumCPU()) + require.Equal(t, r.Metrics["num_goroutine"], runtime.NumGoroutine()) + require.Equal(t, r.Metrics["compression"], "lz4") + require.Equal(t, r.Metrics["compression_ratio"], int64(100)) + require.Equal(t, r.Metrics["size_mb"], 100.1) + require.Equal(t, r.Metrics["lines_written"].(map[string]interface{})["total"], int64(200)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["min"], float64(5)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["max"], float64(300)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["count"], int64(2)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["avg"], float64(300+5)/2) + require.Equal(t, r.Metrics["active_tenants"], int64(2)) + + out, _ := jsoniter.MarshalIndent(r, "", " ") + t.Log(string(out)) +} + +func TestCounter(t *testing.T) { + c := NewCounter("test_counter") + c.Inc(100) + c.Inc(200) + c.Inc(300) + time.Sleep(1 * time.Second) + c.updateRate() + v := c.Value() + require.Equal(t, int64(600), v["total"]) + require.GreaterOrEqual(t, v["rate"], float64(590)) + c.reset() + require.Equal(t, int64(0), c.Value()["total"]) + require.Equal(t, float64(0), c.Value()["rate"]) +} + +func TestStatistic(t *testing.T) { + s := NewStatistics("test_stats") + s.Record(100) + s.Record(200) + s.Record(300) + v := s.Value() + require.Equal(t, float64(100), v["min"]) + require.Equal(t, float64(300), v["max"]) + require.Equal(t, int64(3), v["count"]) + require.Equal(t, float64(100+200+300)/3, v["avg"]) + require.Equal(t, float64(81.64965809277261), v["stddev"]) + require.Equal(t, float64(6666.666666666667), v["stdvar"]) +} + +func TestWordCounter(t *testing.T) { + w := NewWordCounter("test_words_count") + for i := 0; i < 100; i++ { + go func() { + w.Add("foo") + w.Add("bar") + w.Add("foo") + }() + } + require.Equal(t, int64(2), w.Value()) +} diff --git a/pkg/util/build/build.go b/pkg/util/build/build.go index 9a6b75a8bc..76ff10af81 100644 --- a/pkg/util/build/build.go +++ b/pkg/util/build/build.go @@ -4,6 +4,7 @@ import ( "runtime" "github.com/prometheus/common/version" + prom "github.com/prometheus/prometheus/web/api/v1" ) // Version information passed to Prometheus version package. @@ -27,3 +28,14 @@ func init() { version.BuildDate = BuildDate version.GoVersion = runtime.Version() } + +func GetVersion() prom.PrometheusVersion { + return prom.PrometheusVersion{ + Version: version.Version, + Revision: version.Revision, + Branch: version.Branch, + BuildUser: version.BuildUser, + BuildDate: version.BuildDate, + GoVersion: version.GoVersion, + } +} diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index d2598e0d27..81b59a5e4e 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -41,7 +41,7 @@ func NotFoundHandler(w http.ResponseWriter, r *http.Request) { func JSONError(w http.ResponseWriter, code int, message string, args ...interface{}) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(code) - json.NewEncoder(w).Encode(ErrorResponseBody{ + _ = json.NewEncoder(w).Encode(ErrorResponseBody{ Code: code, Status: "error", Message: fmt.Sprintf(message, args...), diff --git a/pkg/util/server/error_test.go b/pkg/util/server/error_test.go index 211ae13803..8e9e48cc02 100644 --- a/pkg/util/server/error_test.go +++ b/pkg/util/server/error_test.go @@ -32,58 +32,78 @@ func Test_writeError(t *testing.T) { }{ {"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest}, {"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, - {"rpc cancelled", + { + "rpc cancelled", status.New(codes.Canceled, context.Canceled.Error()).Err(), "rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"rpc cancelled multi", + http.StatusInternalServerError, + }, + { + "rpc cancelled multi", util.MultiError{status.New(codes.Canceled, context.Canceled.Error()).Err(), status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"mixed context and rpc cancelled", + http.StatusInternalServerError, + }, + { + "mixed context and rpc cancelled", util.MultiError{context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"mixed context, rpc cancelled and another", + http.StatusInternalServerError, + }, + { + "mixed context, rpc cancelled and another", util.MultiError{errors.New("standard error"), context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "3 errors: standard error; context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, {"cancelled storage", promql.ErrStorage{Err: context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, {"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest}, {"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"deadline multi", util.MultiError{context.DeadlineExceeded, context.DeadlineExceeded}, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"rpc deadline", status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), ErrDeadlineExceeded, http.StatusGatewayTimeout}, - {"rpc deadline multi", - util.MultiError{status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), - status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, + { + "rpc deadline multi", + util.MultiError{ + status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), + status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), + }, ErrDeadlineExceeded, - http.StatusGatewayTimeout}, - {"mixed context and rpc deadline", + http.StatusGatewayTimeout, + }, + { + "mixed context and rpc deadline", util.MultiError{context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, ErrDeadlineExceeded, - http.StatusGatewayTimeout}, - {"mixed context, rpc deadline and another", + http.StatusGatewayTimeout, + }, + { + "mixed context, rpc deadline and another", util.MultiError{errors.New("standard error"), context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, "3 errors: standard error; context deadline exceeded; rpc error: code = DeadlineExceeded desc = context deadline exceeded", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, {"parse error", logqlmodel.ParseError{}, "parse error : ", http.StatusBadRequest}, {"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), "foo", http.StatusBadRequest}, {"internal", errors.New("foo"), "foo", http.StatusInternalServerError}, {"query error", chunk.ErrQueryMustContainMetricName, chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest}, - {"wrapped query error", + { + "wrapped query error", fmt.Errorf("wrapped: %w", chunk.ErrQueryMustContainMetricName), "wrapped: " + chunk.ErrQueryMustContainMetricName.Error(), - http.StatusBadRequest}, - {"multi mixed", + http.StatusBadRequest, + }, + { + "multi mixed", util.MultiError{context.Canceled, context.DeadlineExceeded}, "2 errors: context canceled; context deadline exceeded", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, } { t.Run(tt.name, func(t *testing.T) { rec := httptest.NewRecorder() WriteError(tt.err, rec) res := &ErrorResponseBody{} - json.NewDecoder(rec.Result().Body).Decode(res) + _ = json.NewDecoder(rec.Result().Body).Decode(res) require.Equal(t, tt.expectedStatus, res.Code) require.Equal(t, tt.expectedStatus, rec.Result().StatusCode) require.Equal(t, tt.expectedMsg, res.Message) diff --git a/vendor/github.com/google/uuid/null.go b/vendor/github.com/google/uuid/null.go deleted file mode 100644 index d7fcbf2865..0000000000 --- a/vendor/github.com/google/uuid/null.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2021 Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package uuid - -import ( - "bytes" - "database/sql/driver" - "encoding/json" - "fmt" -) - -var jsonNull = []byte("null") - -// NullUUID represents a UUID that may be null. -// NullUUID implements the SQL driver.Scanner interface so -// it can be used as a scan destination: -// -// var u uuid.NullUUID -// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&u) -// ... -// if u.Valid { -// // use u.UUID -// } else { -// // NULL value -// } -// -type NullUUID struct { - UUID UUID - Valid bool // Valid is true if UUID is not NULL -} - -// Scan implements the SQL driver.Scanner interface. -func (nu *NullUUID) Scan(value interface{}) error { - if value == nil { - nu.UUID, nu.Valid = Nil, false - return nil - } - - err := nu.UUID.Scan(value) - if err != nil { - nu.Valid = false - return err - } - - nu.Valid = true - return nil -} - -// Value implements the driver Valuer interface. -func (nu NullUUID) Value() (driver.Value, error) { - if !nu.Valid { - return nil, nil - } - // Delegate to UUID Value function - return nu.UUID.Value() -} - -// MarshalBinary implements encoding.BinaryMarshaler. -func (nu NullUUID) MarshalBinary() ([]byte, error) { - if nu.Valid { - return nu.UUID[:], nil - } - - return []byte(nil), nil -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (nu *NullUUID) UnmarshalBinary(data []byte) error { - if len(data) != 16 { - return fmt.Errorf("invalid UUID (got %d bytes)", len(data)) - } - copy(nu.UUID[:], data) - nu.Valid = true - return nil -} - -// MarshalText implements encoding.TextMarshaler. -func (nu NullUUID) MarshalText() ([]byte, error) { - if nu.Valid { - return nu.UUID.MarshalText() - } - - return jsonNull, nil -} - -// UnmarshalText implements encoding.TextUnmarshaler. -func (nu *NullUUID) UnmarshalText(data []byte) error { - id, err := ParseBytes(data) - if err != nil { - nu.Valid = false - return err - } - nu.UUID = id - nu.Valid = true - return nil -} - -// MarshalJSON implements json.Marshaler. -func (nu NullUUID) MarshalJSON() ([]byte, error) { - if nu.Valid { - return json.Marshal(nu.UUID) - } - - return jsonNull, nil -} - -// UnmarshalJSON implements json.Unmarshaler. -func (nu *NullUUID) UnmarshalJSON(data []byte) error { - if bytes.Equal(data, jsonNull) { - *nu = NullUUID{} - return nil // valid null UUID - } - err := json.Unmarshal(data, &nu.UUID) - nu.Valid = err == nil - return err -} diff --git a/vendor/github.com/google/uuid/uuid.go b/vendor/github.com/google/uuid/uuid.go index a57207aeb6..60d26bb50c 100644 --- a/vendor/github.com/google/uuid/uuid.go +++ b/vendor/github.com/google/uuid/uuid.go @@ -12,7 +12,6 @@ import ( "fmt" "io" "strings" - "sync" ) // A UUID is a 128 bit (16 byte) Universal Unique IDentifier as defined in RFC @@ -34,15 +33,7 @@ const ( Future // Reserved for future definition. ) -const randPoolSize = 16 * 16 - -var ( - rander = rand.Reader // random function - poolEnabled = false - poolMu sync.Mutex - poolPos = randPoolSize // protected with poolMu - pool [randPoolSize]byte // protected with poolMu -) +var rander = rand.Reader // random function type invalidLengthError struct{ len int } @@ -50,12 +41,6 @@ func (err invalidLengthError) Error() string { return fmt.Sprintf("invalid UUID length: %d", err.len) } -// IsInvalidLengthError is matcher function for custom error invalidLengthError -func IsInvalidLengthError(err error) bool { - _, ok := err.(invalidLengthError) - return ok -} - // Parse decodes s into a UUID or returns an error. Both the standard UUID // forms of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and // urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx are decoded as well as the @@ -264,31 +249,3 @@ func SetRand(r io.Reader) { } rander = r } - -// EnableRandPool enables internal randomness pool used for Random -// (Version 4) UUID generation. The pool contains random bytes read from -// the random number generator on demand in batches. Enabling the pool -// may improve the UUID generation throughput significantly. -// -// Since the pool is stored on the Go heap, this feature may be a bad fit -// for security sensitive applications. -// -// Both EnableRandPool and DisableRandPool are not thread-safe and should -// only be called when there is no possibility that New or any other -// UUID Version 4 generation function will be called concurrently. -func EnableRandPool() { - poolEnabled = true -} - -// DisableRandPool disables the randomness pool if it was previously -// enabled with EnableRandPool. -// -// Both EnableRandPool and DisableRandPool are not thread-safe and should -// only be called when there is no possibility that New or any other -// UUID Version 4 generation function will be called concurrently. -func DisableRandPool() { - poolEnabled = false - defer poolMu.Unlock() - poolMu.Lock() - poolPos = randPoolSize -} diff --git a/vendor/github.com/google/uuid/version4.go b/vendor/github.com/google/uuid/version4.go index 7697802e4d..86160fbd07 100644 --- a/vendor/github.com/google/uuid/version4.go +++ b/vendor/github.com/google/uuid/version4.go @@ -27,8 +27,6 @@ func NewString() string { // The strength of the UUIDs is based on the strength of the crypto/rand // package. // -// Uses the randomness pool if it was enabled with EnableRandPool. -// // A note about uniqueness derived from the UUID Wikipedia entry: // // Randomly generated UUIDs have 122 random bits. One's annual risk of being @@ -37,10 +35,7 @@ func NewString() string { // equivalent to the odds of creating a few tens of trillions of UUIDs in a // year and having one duplicate. func NewRandom() (UUID, error) { - if !poolEnabled { - return NewRandomFromReader(rander) - } - return newRandomFromPool() + return NewRandomFromReader(rander) } // NewRandomFromReader returns a UUID based on bytes read from a given io.Reader. @@ -54,23 +49,3 @@ func NewRandomFromReader(r io.Reader) (UUID, error) { uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10 return uuid, nil } - -func newRandomFromPool() (UUID, error) { - var uuid UUID - poolMu.Lock() - if poolPos == randPoolSize { - _, err := io.ReadFull(rander, pool[:]) - if err != nil { - poolMu.Unlock() - return Nil, err - } - poolPos = 0 - } - copy(uuid[:], pool[poolPos:(poolPos+16)]) - poolPos += 16 - poolMu.Unlock() - - uuid[6] = (uuid[6] & 0x0f) | 0x40 // Version 4 - uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10 - return uuid, nil -} diff --git a/vendor/github.com/grafana/dskit/backoff/backoff.go b/vendor/github.com/grafana/dskit/backoff/backoff.go index 2146f3b928..c5d4547159 100644 --- a/vendor/github.com/grafana/dskit/backoff/backoff.go +++ b/vendor/github.com/grafana/dskit/backoff/backoff.go @@ -10,9 +10,9 @@ import ( // Config configures a Backoff type Config struct { - MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level - MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level - MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries + MinBackoff time.Duration `yaml:"min_period" category:"advanced"` // start backoff at this level + MaxBackoff time.Duration `yaml:"max_period" category:"advanced"` // increase exponentially to this level + MaxRetries int `yaml:"max_retries" category:"advanced"` // give up after this many; zero means infinite retries } // RegisterFlagsWithPrefix for Config. diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go index a6740f3ac9..023be10d7a 100644 --- a/vendor/github.com/grafana/dskit/concurrency/runner.go +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/grafana/dskit/internal/math" @@ -62,45 +63,53 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun // ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. // The execution breaks on first error encountered. +// +// Deprecated: use ForEachJob instead. func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { - if len(jobs) == 0 { - return nil + return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { + return jobFunc(ctx, jobs[idx]) + }) +} + +// CreateJobsFromStrings is an utility to create jobs from an slice of strings. +// +// Deprecated: will be removed as it's not needed when using ForEachJob. +func CreateJobsFromStrings(values []string) []interface{} { + jobs := make([]interface{}, len(values)) + for i := 0; i < len(values); i++ { + jobs[i] = values[i] } + return jobs +} - // Push all jobs to a channel. - ch := make(chan interface{}, len(jobs)) - for _, job := range jobs { - ch <- job +// ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. +// The execution breaks on first error encountered. +func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error { + if jobs == 0 { + return nil } - close(ch) + + // Initialise indexes with -1 so first Inc() returns index 0. + indexes := atomic.NewInt64(-1) // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) - for ix := 0; ix < math.Min(concurrency, len(jobs)); ix++ { + for ix := 0; ix < math.Min(concurrency, jobs); ix++ { g.Go(func() error { - for job := range ch { - if err := ctx.Err(); err != nil { - return err + for ctx.Err() == nil { + idx := int(indexes.Inc()) + if idx >= jobs { + return nil } - if err := jobFunc(ctx, job); err != nil { + if err := jobFunc(ctx, idx); err != nil { return err } } - - return nil + return ctx.Err() }) } // Wait until done (or context has canceled). return g.Wait() } - -// CreateJobsFromStrings is an utility to create jobs from an slice of strings. -func CreateJobsFromStrings(values []string) []interface{} { - jobs := make([]interface{}, len(values)) - for i := 0; i < len(values); i++ { - jobs[i] = values[i] - } - return jobs -} diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go index a6fa46f073..1588edc893 100644 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ b/vendor/github.com/grafana/dskit/crypto/tls/tls.go @@ -13,11 +13,11 @@ import ( // ClientConfig is the config for client TLS. type ClientConfig struct { - CertPath string `yaml:"tls_cert_path"` - KeyPath string `yaml:"tls_key_path"` - CAPath string `yaml:"tls_ca_path"` - ServerName string `yaml:"tls_server_name"` - InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"` + CertPath string `yaml:"tls_cert_path" category:"advanced"` + KeyPath string `yaml:"tls_key_path" category:"advanced"` + CAPath string `yaml:"tls_ca_path" category:"advanced"` + ServerName string `yaml:"tls_server_name" category:"advanced"` + InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify" category:"advanced"` } var ( diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 094337f5d2..e7d93b64ec 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -18,16 +18,16 @@ import ( // Config for a gRPC client. type Config struct { - MaxRecvMsgSize int `yaml:"max_recv_msg_size"` - MaxSendMsgSize int `yaml:"max_send_msg_size"` - GRPCCompression string `yaml:"grpc_compression"` - RateLimit float64 `yaml:"rate_limit"` - RateLimitBurst int `yaml:"rate_limit_burst"` + MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"` + MaxSendMsgSize int `yaml:"max_send_msg_size" category:"advanced"` + GRPCCompression string `yaml:"grpc_compression" category:"advanced"` + RateLimit float64 `yaml:"rate_limit" category:"advanced"` + RateLimitBurst int `yaml:"rate_limit_burst" category:"advanced"` - BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` + BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits" category:"advanced"` BackoffConfig backoff.Config `yaml:"backoff_config"` - TLSEnabled bool `yaml:"tls_enabled"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` TLS tls.ClientConfig `yaml:",inline"` } diff --git a/vendor/github.com/grafana/dskit/kv/client.go b/vendor/github.com/grafana/dskit/kv/client.go index b73620dfd6..42bf559546 100644 --- a/vendor/github.com/grafana/dskit/kv/client.go +++ b/vendor/github.com/grafana/dskit/kv/client.go @@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels { // The NewInMemoryKVClient returned by NewClient() is a singleton, so // that distributors and ingesters started in the same process can // find themselves. -var inmemoryStoreInit sync.Once -var inmemoryStore Client +var ( + inmemoryStoreInit sync.Once + inmemoryStore *consul.Client +) // StoreConfig is a configuration used for building single store client, either // Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep @@ -53,7 +55,7 @@ type StoreConfig struct { // where store can be consul or inmemory. type Config struct { Store string `yaml:"store"` - Prefix string `yaml:"prefix"` + Prefix string `yaml:"prefix" category:"advanced"` StoreConfig `yaml:",inline"` Mock Client `yaml:"-"` @@ -76,8 +78,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f if flagsPrefix == "" { flagsPrefix = "ring." } + + // Allow clients to override default store by setting it before calling this method. + if cfg.Store == "" { + cfg.Store = "consul" + } + f.StringVar(&cfg.Prefix, flagsPrefix+"prefix", defaultPrefix, "The prefix for the keys in the store. Should end with a /.") - f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.") + f.StringVar(&cfg.Store, flagsPrefix+"store", cfg.Store, "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.") } // Client is a high-level client for key-value stores (such as Etcd and @@ -140,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co inmemoryStoreInit.Do(func() { inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) - client = inmemoryStore + // however we swap the codec so that we can encode different type of values. + client = inmemoryStore.WithCodec(codec) case "memberlist": kv, err := cfg.MemberlistKV() diff --git a/vendor/github.com/grafana/dskit/kv/consul/client.go b/vendor/github.com/grafana/dskit/kv/consul/client.go index 69219cf748..63114c547b 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/client.go +++ b/vendor/github.com/grafana/dskit/kv/consul/client.go @@ -40,11 +40,11 @@ var ( // Config to create a ConsulClient type Config struct { Host string `yaml:"host"` - ACLToken string `yaml:"acl_token"` - HTTPClientTimeout time.Duration `yaml:"http_client_timeout"` - ConsistentReads bool `yaml:"consistent_reads"` - WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit - WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1 + ACLToken string `yaml:"acl_token" category:"advanced"` + HTTPClientTimeout time.Duration `yaml:"http_client_timeout" category:"advanced"` + ConsistentReads bool `yaml:"consistent_reads" category:"advanced"` + WatchKeyRateLimit float64 `yaml:"watch_rate_limit" category:"advanced"` // Zero disables rate limit + WatchKeyBurstSize int `yaml:"watch_burst_size" category:"advanced"` // Burst when doing rate-limit, defaults to 1 // Used in tests only. MaxCasRetries int `yaml:"-"` @@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b } kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx)) - // Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value, // and next call to Get will block as expected. We handle missing value below. if err != nil { @@ -397,3 +396,10 @@ func (c *Client) createRateLimiter() *rate.Limiter { } return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst) } + +// WithCodec Clones and changes the codec of the consul client. +func (c *Client) WithCodec(codec codec.Codec) *Client { + new := *c + new.codec = codec + return &new +} diff --git a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go index fa6944d4f5..0661fc5daa 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go +++ b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go @@ -22,9 +22,9 @@ import ( // Config for a new etcd.Client. type Config struct { Endpoints []string `yaml:"endpoints"` - DialTimeout time.Duration `yaml:"dial_timeout"` - MaxRetries int `yaml:"max_retries"` - EnableTLS bool `yaml:"tls_enabled"` + DialTimeout time.Duration `yaml:"dial_timeout" category:"advanced"` + MaxRetries int `yaml:"max_retries" category:"advanced"` + EnableTLS bool `yaml:"tls_enabled" category:"advanced"` TLS dstls.ClientConfig `yaml:",inline"` UserName string `yaml:"username"` diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index d7ad176d0e..30f0992d35 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -124,16 +124,16 @@ func (c *Client) awaitKVRunningOrStopping(ctx context.Context) error { // KVConfig is a config for memberlist.KV type KVConfig struct { // Memberlist options. - NodeName string `yaml:"node_name"` - RandomizeNodeName bool `yaml:"randomize_node_name"` - StreamTimeout time.Duration `yaml:"stream_timeout"` - RetransmitMult int `yaml:"retransmit_factor"` - PushPullInterval time.Duration `yaml:"pull_push_interval"` - GossipInterval time.Duration `yaml:"gossip_interval"` - GossipNodes int `yaml:"gossip_nodes"` - GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"` - DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"` - EnableCompression bool `yaml:"compression_enabled"` + NodeName string `yaml:"node_name" category:"advanced"` + RandomizeNodeName bool `yaml:"randomize_node_name" category:"advanced"` + StreamTimeout time.Duration `yaml:"stream_timeout" category:"advanced"` + RetransmitMult int `yaml:"retransmit_factor" category:"advanced"` + PushPullInterval time.Duration `yaml:"pull_push_interval" category:"advanced"` + GossipInterval time.Duration `yaml:"gossip_interval" category:"advanced"` + GossipNodes int `yaml:"gossip_nodes" category:"advanced"` + GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` + DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` + EnableCompression bool `yaml:"compression_enabled" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -141,20 +141,20 @@ type KVConfig struct { // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` - MinJoinBackoff time.Duration `yaml:"min_join_backoff"` - MaxJoinBackoff time.Duration `yaml:"max_join_backoff"` - MaxJoinRetries int `yaml:"max_join_retries"` + MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"` + MaxJoinBackoff time.Duration `yaml:"max_join_backoff" category:"advanced"` + MaxJoinRetries int `yaml:"max_join_retries" category:"advanced"` AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"` - RejoinInterval time.Duration `yaml:"rejoin_interval"` + RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"` // Remove LEFT ingesters from ring after this timeout. - LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"` + LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout"` + LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). - MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes"` + MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` TCPTransport TCPTransportConfig `yaml:",inline"` diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index 4265a3b223..eb54518783 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -45,10 +45,10 @@ type TCPTransportConfig struct { // Timeout used when making connections to other nodes to send packet. // Zero = no timeout - PacketDialTimeout time.Duration `yaml:"packet_dial_timeout"` + PacketDialTimeout time.Duration `yaml:"packet_dial_timeout" category:"advanced"` // Timeout for writing packet data. Zero = no timeout. - PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"` + PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` // Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-"` @@ -57,7 +57,7 @@ type TCPTransportConfig struct { MetricsRegisterer prometheus.Registerer `yaml:"-"` MetricsNamespace string `yaml:"-"` - TLSEnabled bool `yaml:"tls_enabled"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` TLS dstls.ClientConfig `yaml:",inline"` } diff --git a/vendor/github.com/grafana/dskit/kv/multi.go b/vendor/github.com/grafana/dskit/kv/multi.go index 8a3382e985..9a9c24bb83 100644 --- a/vendor/github.com/grafana/dskit/kv/multi.go +++ b/vendor/github.com/grafana/dskit/kv/multi.go @@ -16,11 +16,11 @@ import ( // MultiConfig is a configuration for MultiClient. type MultiConfig struct { - Primary string `yaml:"primary"` - Secondary string `yaml:"secondary"` + Primary string `yaml:"primary" category:"advanced"` + Secondary string `yaml:"secondary" category:"advanced"` - MirrorEnabled bool `yaml:"mirror_enabled"` - MirrorTimeout time.Duration `yaml:"mirror_timeout"` + MirrorEnabled bool `yaml:"mirror_enabled" category:"advanced"` + MirrorTimeout time.Duration `yaml:"mirror_timeout" category:"advanced"` // ConfigProvider returns channel with MultiRuntimeConfig updates. ConfigProvider func() <-chan MultiRuntimeConfig `yaml:"-"` diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 726a85430d..32775c9829 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -3,6 +3,7 @@ package ring import ( "context" "fmt" + "net/http" "sort" "sync" "time" @@ -491,3 +492,20 @@ func (l *BasicLifecycler) run(fn func() error) error { return <-errCh } } + +func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return l.store.CAS(ctx, l.ringKey, f) +} + +func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := l.store.Get(ctx, l.ringKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req) +} diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index f23f08b812..1d6c10e801 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -10,8 +10,6 @@ import ( "sort" "strings" "time" - - "github.com/go-kit/log/level" ) const pageContent = ` @@ -90,19 +88,6 @@ func init() { pageTemplate = template.Must(t.Parse(pageContent)) } -func (r *Ring) forget(ctx context.Context, id string) error { - unregister := func(in interface{}) (out interface{}, retry bool, err error) { - if in == nil { - return nil, false, fmt.Errorf("found empty ring when trying to unregister") - } - - ringDesc := in.(*Desc) - ringDesc.RemoveIngester(id) - return ringDesc, true, nil - } - return r.KVClient.CAS(ctx, r.key, unregister) -} - type ingesterDesc struct { ID string `json:"id"` State string `json:"state"` @@ -121,11 +106,33 @@ type httpResponse struct { ShowTokens bool `json:"-"` } -func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { +type ringAccess interface { + casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error + getRing(context.Context) (*Desc, error) +} + +type ringPageHandler struct { + r ringAccess + heartbeatPeriod time.Duration +} + +func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler { + return &ringPageHandler{ + r: r, + heartbeatPeriod: heartbeatPeriod, + } +} + +func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { if req.Method == http.MethodPost { ingesterID := req.FormValue("forget") - if err := r.forget(req.Context(), ingesterID); err != nil { - level.Error(r.logger).Log("msg", "error forgetting instance", "err", err) + if err := h.forget(req.Context(), ingesterID); err != nil { + http.Error( + w, + fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(), + http.StatusInternalServerError, + ) + return } // Implement PRG pattern to prevent double-POST and work with CSRF middleware. @@ -140,23 +147,26 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - r.mtx.RLock() - defer r.mtx.RUnlock() + ringDesc, err := h.r.getRing(req.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, ownedTokens := ringDesc.countTokens() ingesterIDs := []string{} - for id := range r.ringDesc.Ingesters { + for id := range ringDesc.Ingesters { ingesterIDs = append(ingesterIDs, id) } sort.Strings(ingesterIDs) now := time.Now() var ingesters []ingesterDesc - _, owned := r.countTokens() for _, id := range ingesterIDs { - ing := r.ringDesc.Ingesters[id] + ing := ringDesc.Ingesters[id] heartbeatTimestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(&ing, Reporting, now) { + if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) { state = unhealthy } @@ -175,7 +185,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { Tokens: ing.Tokens, Zone: ing.Zone, NumTokens: len(ing.Tokens), - Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100, + Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, }) } @@ -203,6 +213,19 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ } } +func (h *ringPageHandler) forget(ctx context.Context, id string) error { + unregister := func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return nil, false, fmt.Errorf("found empty ring when trying to unregister") + } + + ringDesc := in.(*Desc) + ringDesc.RemoveIngester(id) + return ringDesc, true, nil + } + return h.r.casRing(ctx, unregister) +} + // WriteJSONResponse writes some JSON as a HTTP response. func writeJSONResponse(w http.ResponseWriter, v httpResponse) { w.Header().Set("Content-Type", "application/json") diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index be103e1fba..92ad34608f 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "net/http" "os" "sort" "sync" @@ -26,17 +27,20 @@ type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control - NumTokens int `yaml:"num_tokens"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - ObservePeriod time.Duration `yaml:"observe_period"` - JoinAfter time.Duration `yaml:"join_after"` - MinReadyDuration time.Duration `yaml:"min_ready_duration"` - InfNames []string `yaml:"interface_names"` - FinalSleep time.Duration `yaml:"final_sleep"` + NumTokens int `yaml:"num_tokens" category:"advanced"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"` + ObservePeriod time.Duration `yaml:"observe_period" category:"advanced"` + JoinAfter time.Duration `yaml:"join_after" category:"advanced"` + MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` + InfNames []string `yaml:"interface_names"` + + // FinalSleep's default value can be overridden by + // setting it before calling RegisterFlags or RegisterFlagsWithPrefix. + FinalSleep time.Duration `yaml:"final_sleep" category:"advanced"` TokensFilePath string `yaml:"tokens_file_path"` Zone string `yaml:"availability_zone"` - UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` - ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health"` + UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"` + ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health" category:"advanced"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` @@ -47,12 +51,14 @@ type LifecyclerConfig struct { ListenPort int `yaml:"-"` } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. +// The default values of some flags can be changed; see docs of LifecyclerConfig. func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. +// The default values of some flags can be changed; see docs of LifecyclerConfig. func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f) @@ -67,7 +73,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.") f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.") f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive ring updates.") - f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") + f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", cfg.FinalSleep, "Duration to sleep for before exiting, to ensure metrics are scraped.") f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") hostname, err := os.Hostname() @@ -849,6 +855,23 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { time.Sleep(i.cfg.FinalSleep) } +func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return i.KVStore.CAS(ctx, i.RingKey, f) +} + +func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := i.KVStore.Get(ctx, i.RingKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req) +} + // unregister removes our entry from consul. func (i *Lifecycler) unregister(ctx context.Context) error { level.Debug(i.logger).Log("msg", "unregistering instance from ring", "ring", i.RingName) diff --git a/vendor/github.com/grafana/dskit/ring/replication_set.go b/vendor/github.com/grafana/dskit/ring/replication_set.go index 461429d6fa..b73227136d 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set.go @@ -20,8 +20,9 @@ type ReplicationSet struct { MaxUnavailableZones int } -// Do function f in parallel for all replicas in the set, erroring is we exceed +// Do function f in parallel for all replicas in the set, erroring if we exceed // MaxErrors and returning early otherwise. +// Return a slice of all results from f, or nil if an error occurred. func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) { type instanceResult struct { res interface{} diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 6aaf165bf9..5553c6b721 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -8,11 +8,13 @@ import ( "fmt" "math" "math/rand" + "net/http" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -513,27 +515,32 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro } // countTokens returns the number of tokens and tokens within the range for each instance. -// The ring read lock must be already taken when calling this function. -func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { - owned := map[string]uint32{} - numTokens := map[string]uint32{} - for i, token := range r.ringTokens { +func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { + var ( + owned = map[string]uint32{} + numTokens = map[string]uint32{} + + ringTokens = r.GetTokens() + ringInstanceByToken = r.getTokensInfo() + ) + + for i, token := range ringTokens { var diff uint32 // Compute how many tokens are within the range. - if i+1 == len(r.ringTokens) { - diff = (math.MaxUint32 - token) + r.ringTokens[0] + if i+1 == len(ringTokens) { + diff = (math.MaxUint32 - token) + ringTokens[0] } else { - diff = r.ringTokens[i+1] - token + diff = ringTokens[i+1] - token } - info := r.ringInstanceByToken[token] + info := ringInstanceByToken[token] numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 owned[info.InstanceID] = owned[info.InstanceID] + diff } // Set to 0 the number of owned tokens by instances which don't have tokens yet. - for id := range r.ringDesc.Ingesters { + for id := range r.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 numTokens[id] = 0 @@ -582,7 +589,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { prevOwners := r.reportedOwners r.reportedOwners = make(map[string]struct{}) - numTokens, ownedRange := r.countTokens() + numTokens, ownedRange := r.ringDesc.countTokens() for id, totalOwned := range ownedRange { r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) @@ -840,6 +847,23 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) { } } +func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return r.KVClient.CAS(ctx, r.key, f) +} + +func (r *Ring) getRing(ctx context.Context) (*Desc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + ringDesc := proto.Clone(r.ringDesc).(*Desc) + + return ringDesc, nil +} + +func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req) +} + // Operation describes which instances can be included in the replica set, based on their state. // // Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states. diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go index e5da50bc7a..a7f29ab8cd 100644 --- a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -26,7 +26,7 @@ type Loader func(r io.Reader) (interface{}, error) // Config holds the config for an Manager instance. // It holds config related to loading per-tenant config. type Config struct { - ReloadPeriod time.Duration `yaml:"period"` + ReloadPeriod time.Duration `yaml:"period" category:"advanced"` // LoadPath contains the path to the runtime config file, requires an // non-empty value LoadPath string `yaml:"file"` diff --git a/vendor/modules.txt b/vendor/modules.txt index 12bcf7b034..be72b52c70 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -590,7 +590,7 @@ github.com/google/pprof/profile # github.com/google/renameio/v2 v2.0.0 ## explicit; go 1.13 github.com/google/renameio/v2 -# github.com/google/uuid v1.3.0 +# github.com/google/uuid v1.2.0 ## explicit github.com/google/uuid # github.com/googleapis/gax-go/v2 v2.1.1 @@ -623,7 +623,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.4.2 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 +# github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 ## explicit; go 1.16 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency