Add usage report into Loki. (#5361)

* Adds leader election process

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fluke

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fixes the kv typecheck

* wire up the http client

* Hooking into loki services, hit a bug

* Add stats variable.

* re-vendor dskit and improve to never fail service

* Intrument Loki with the package

* Add changelog entry

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes compactor test

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add configuration documentation

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update pkg/usagestats/reporter.go

Co-authored-by: Danny Kopping <dannykopping@gmail.com>

* Add boundary check

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add log for success report.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update pkg/usagestats/reporter.go

Co-authored-by: Danny Kopping <dannykopping@gmail.com>

Co-authored-by: Danny Kopping <dannykopping@gmail.com>
pull/5364/head
Cyril Tovena 4 years ago committed by GitHub
parent 02416736d0
commit bbaef790db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 10
      clients/pkg/promtail/targets/cloudflare/target.go
  3. 26
      clients/pkg/promtail/targets/cloudflare/target_test.go
  4. 13
      docs/sources/configuration/_index.md
  5. 4
      go.mod
  6. 7
      go.sum
  7. 7
      pkg/distributor/distributor.go
  8. 17
      pkg/ingester/flush.go
  9. 27
      pkg/ingester/ingester.go
  10. 5
      pkg/ingester/instance.go
  11. 5
      pkg/ingester/stream.go
  12. 6
      pkg/loghttp/push/push.go
  13. 18
      pkg/logql/metrics.go
  14. 23
      pkg/loki/loki.go
  15. 30
      pkg/loki/modules.go
  16. 8
      pkg/ruler/base/ruler.go
  17. 12
      pkg/storage/store.go
  18. 13
      pkg/storage/stores/shipper/compactor/compactor.go
  19. 11
      pkg/storage/stores/shipper/compactor/table.go
  20. 10
      pkg/storage/stores/shipper/downloads/index_set.go
  21. 4
      pkg/storage/stores/shipper/downloads/table.go
  22. 266
      pkg/usagestats/reporter.go
  23. 147
      pkg/usagestats/reporter_test.go
  24. 33
      pkg/usagestats/seed.go
  25. 352
      pkg/usagestats/stats.go
  26. 97
      pkg/usagestats/stats_test.go
  27. 12
      pkg/util/build/build.go
  28. 2
      pkg/util/server/error.go
  29. 62
      pkg/util/server/error_test.go
  30. 118
      vendor/github.com/google/uuid/null.go
  31. 45
      vendor/github.com/google/uuid/uuid.go
  32. 27
      vendor/github.com/google/uuid/version4.go
  33. 6
      vendor/github.com/grafana/dskit/backoff/backoff.go
  34. 55
      vendor/github.com/grafana/dskit/concurrency/runner.go
  35. 10
      vendor/github.com/grafana/dskit/crypto/tls/tls.go
  36. 14
      vendor/github.com/grafana/dskit/grpcclient/grpcclient.go
  37. 19
      vendor/github.com/grafana/dskit/kv/client.go
  38. 18
      vendor/github.com/grafana/dskit/kv/consul/client.go
  39. 6
      vendor/github.com/grafana/dskit/kv/etcd/etcd.go
  40. 34
      vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
  41. 6
      vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go
  42. 8
      vendor/github.com/grafana/dskit/kv/multi.go
  43. 18
      vendor/github.com/grafana/dskit/ring/basic_lifecycler.go
  44. 73
      vendor/github.com/grafana/dskit/ring/http.go
  45. 45
      vendor/github.com/grafana/dskit/ring/lifecycler.go
  46. 3
      vendor/github.com/grafana/dskit/ring/replication_set.go
  47. 46
      vendor/github.com/grafana/dskit/ring/ring.go
  48. 2
      vendor/github.com/grafana/dskit/runtimeconfig/manager.go
  49. 4
      vendor/modules.txt

@ -1,5 +1,6 @@
## Main
* [5361](https://github.com/grafana/loki/pull/5361) **ctovena**: Add usage report to grafana.com.
* [5289](https://github.com/grafana/loki/pull/5289) **ctovena**: Fix deduplication bug in queries when mutating labels.
* [5302](https://github.com/grafana/loki/pull/5302) **MasslessParticle** Update azure blobstore client to use new sdk.
* [5243](https://github.com/grafana/loki/pull/5290) **ssncferreira**: Update Promtail to support duration string formats.

@ -109,11 +109,11 @@ func (t *Target) start() {
end = maxEnd
}
start := end.Add(-time.Duration(t.config.PullRange))
requests := splitRequests(start, end, t.config.Workers)
// Use background context for workers as we don't want to cancel half way through.
// In case of errors we stop the target, each worker has it's own retry logic.
if err := concurrency.ForEach(context.Background(), splitRequests(start, end, t.config.Workers), t.config.Workers, func(ctx context.Context, job interface{}) error {
request := job.(pullRequest)
if err := concurrency.ForEachJob(context.Background(), len(requests), t.config.Workers, func(ctx context.Context, idx int) error {
request := requests[idx]
return t.pull(ctx, request.start, request.end)
}); err != nil {
level.Error(t.logger).Log("msg", "failed to pull logs", "err", err, "start", start, "end", end)
@ -229,9 +229,9 @@ type pullRequest struct {
end time.Time
}
func splitRequests(start, end time.Time, workers int) []interface{} {
func splitRequests(start, end time.Time, workers int) []pullRequest {
perWorker := end.Sub(start) / time.Duration(workers)
var requests []interface{}
var requests []pullRequest
for i := 0; i < workers; i++ {
r := pullRequest{
start: start.Add(time.Duration(i) * perWorker),

@ -251,26 +251,26 @@ func Test_splitRequests(t *testing.T) {
tests := []struct {
start time.Time
end time.Time
want []interface{}
want []pullRequest
}{
// perfectly divisible
{
time.Unix(0, 0),
time.Unix(0, int64(time.Minute)),
[]interface{}{
pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},
pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},
pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))},
[]pullRequest{
{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},
{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},
{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))},
},
},
// not divisible
{
time.Unix(0, 0),
time.Unix(0, int64(time.Minute+1)),
[]interface{}{
pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},
pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},
pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))},
[]pullRequest{
{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))},
{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))},
{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))},
},
},
}
@ -279,11 +279,11 @@ func Test_splitRequests(t *testing.T) {
got := splitRequests(tt.start, tt.end, 3)
if !assert.Equal(t, tt.want, got) {
for i := range got {
if !assert.Equal(t, tt.want[i].(pullRequest).start, got[i].(pullRequest).start) {
t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].(pullRequest).start.UnixNano(), got[i].(pullRequest).start.UnixNano())
if !assert.Equal(t, tt.want[i].start, got[i].start) {
t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].start.UnixNano(), got[i].start.UnixNano())
}
if !assert.Equal(t, tt.want[i].(pullRequest).end, got[i].(pullRequest).end) {
t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].(pullRequest).end.UnixNano(), got[i].(pullRequest).end.UnixNano())
if !assert.Equal(t, tt.want[i].end, got[i].end) {
t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].end.UnixNano(), got[i].end.UnixNano())
}
}
}

@ -164,6 +164,9 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# If a more specific configuration is given in other sections,
# the related configuration within this section will be ignored.
[common: <common>]
# Configuration for usage report
[usage_report: <usage_report>]
```
## server
@ -2496,6 +2499,16 @@ This way, one doesn't have to replicate configuration in multiple places.
[ring: <ring>]
```
## usage_report
This block allow to configure usage report of Loki to grafana.com
```yaml
# Whether or not usage report should be disabled.
# CLI flag: -usage-report.disabled
[disabled: <boolean>: default = false]
```
### storage
The common `storage` block defines a common storage to be reused by different

@ -42,7 +42,7 @@ require (
github.com/google/go-cmp v0.5.6
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5
github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/consul/api v1.12.0
@ -105,6 +105,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.0.0-00010101000000-000000000000
github.com/google/renameio/v2 v2.0.0
github.com/google/uuid v1.2.0
github.com/mattn/go-ieproxy v0.0.1
github.com/xdg-go/scram v1.0.2
gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0
@ -185,7 +186,6 @@ require (
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/gophercloud/gophercloud v0.24.0 // indirect

@ -994,9 +994,8 @@ github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxeh
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww=
github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@ -1033,8 +1032,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 h1:IXo/V2+KKLYLD724qh3uRaZgAy3BV3HdtXuSs7lb3jU=
github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE=
github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 h1:R0Pw7VjouhYSS7bsMdxEidcJbCq1KUBCzPgsh7805NM=
github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0/go.mod h1:Q9WmQ9cVkrHx6g4KSl6TN+N3vEOkDZd9RtyXCHd5OPQ=
github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 h1:aEOagXOTqtN9gd4jiDuP/5a81HdoJBqkVfn8WaxbsK4=
github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8/go.mod h1:QAvS2C7TtQRhhv9Uf/sxD+BUhpkrPFm5jK/9MzUiDCY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=

@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
@ -37,7 +38,10 @@ const (
ringKey = "distributor"
)
var maxLabelCacheSize = 100000
var (
maxLabelCacheSize = 100000
rfStats = usagestats.NewInt("distributor_replication_factor")
)
// Config for a Distributor.
type Config struct {
@ -168,6 +172,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
}),
}
d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
rfStats.Set(int64(ingestersRing.ReplicationFactor()))
servs = append(servs, d.pool)
d.subservices, err = services.NewManager(servs...)

@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
loki_util "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
@ -90,6 +91,12 @@ var (
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
})
flushedChunksStats = usagestats.NewCounter("ingester_flushed_chunks")
flushedChunksBytesStats = usagestats.NewStatistics("ingester_flushed_chunks_bytes")
flushedChunksLinesStats = usagestats.NewStatistics("ingester_flushed_chunks_lines")
flushedChunksAgeStats = usagestats.NewStatistics("ingester_flushed_chunks_age_seconds")
flushedChunksLifespanStats = usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds")
flushedChunksUtilizationStats = usagestats.NewStatistics("ingester_flushed_chunks_utilization")
)
const (
@ -382,6 +389,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
if err := i.store.Put(ctx, wireChunks); err != nil {
return err
}
flushedChunksStats.Inc(int64(len(wireChunks)))
// Record statistics only when actual put request did not return error.
sizePerTenant := chunkSizePerTenant.WithLabelValues(userID)
@ -408,7 +416,8 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}
chunkUtilization.Observe(wc.Data.Utilization())
utilization := wc.Data.Utilization()
chunkUtilization.Observe(utilization)
chunkEntries.Observe(float64(numEntries))
chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
@ -416,6 +425,12 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
firstTime, lastTime := cs[i].chunk.Bounds()
chunkAge.Observe(time.Since(firstTime).Seconds())
chunkLifespan.Observe(lastTime.Sub(firstTime).Hours())
flushedChunksBytesStats.Record(compressedSize)
flushedChunksLinesStats.Record(float64(numEntries))
flushedChunksUtilizationStats.Record(utilization)
flushedChunksAgeStats.Record(time.Since(firstTime).Seconds())
flushedChunksLifespanStats.Record(lastTime.Sub(firstTime).Hours())
}
return nil

@ -32,6 +32,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
errUtil "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
@ -45,12 +46,18 @@ const (
// ErrReadOnly is returned when the ingester is shutting down and a push was
// attempted.
var ErrReadOnly = errors.New("Ingester is shutting down")
var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_flush_queue_length",
Help: "The total number of series pending in the flush queue.",
})
var (
ErrReadOnly = errors.New("Ingester is shutting down")
flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingester_flush_queue_length",
Help: "The total number of series pending in the flush queue.",
})
compressionStats = usagestats.NewString("ingester_compression")
targetSizeStats = usagestats.NewInt("ingester_target_size_bytes")
walStats = usagestats.NewString("ingester_wal")
activeTenantsStats = usagestats.NewInt("ingester_active_tenants")
)
// Config for an ingester.
type Config struct {
@ -215,7 +222,12 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
compressionStats.Set(cfg.ChunkEncoding)
targetSizeStats.Set(int64(cfg.TargetChunkSize))
walStats.Set("disabled")
if cfg.WAL.Enabled {
walStats.Set("enabled")
}
metrics := newIngesterMetrics(registerer)
i := &Ingester{
@ -549,6 +561,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) *instance {
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter)
i.instances[instanceID] = inst
activeTenantsStats.Set(int64(len(i.instances)))
}
return inst
}

@ -26,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/math"
@ -53,6 +54,8 @@ var (
Name: "ingester_streams_removed_total",
Help: "The total number of streams removed per tenant.",
}, []string{"tenant"})
streamsCountStats = usagestats.NewInt("ingester_streams_count")
)
type instance struct {
@ -248,6 +251,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord
memoryStreams.WithLabelValues(i.instanceID).Inc()
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(s)
streamsCountStats.Add(1)
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
@ -288,6 +292,7 @@ func (i *instance) removeStream(s *stream) {
i.index.Delete(s.labels, s.fp)
i.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Dec()
streamsCountStats.Add(-1)
}
}

@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util/flagext"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
@ -47,6 +48,8 @@ var (
Buckets: prometheus.ExponentialBuckets(5, 2, 6),
})
chunkCreatedStats = usagestats.NewCounter("ingester_chunk_created")
)
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
@ -203,6 +206,7 @@ func (s *stream) Push(
chunk: s.NewChunk(),
})
chunksCreatedTotal.Inc()
chunkCreatedStats.Inc(1)
}
var storedEntries []logproto.Entry
@ -379,6 +383,7 @@ func (s *stream) cutChunk(ctx context.Context) *chunkDesc {
samplesPerChunk.Observe(float64(chunk.chunk.Size()))
blocksPerChunk.Observe(float64(chunk.chunk.BlockCount()))
chunksCreatedTotal.Inc()
chunkCreatedStats.Inc(1)
s.chunks = append(s.chunks, chunkDesc{
chunk: s.NewChunk(),

@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
loki_util "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/unmarshal"
@ -39,6 +40,9 @@ var (
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})
bytesReceivedStats = usagestats.NewCounter("distributor_bytes_received")
linesReceivedStats = usagestats.NewCounter("distributor_lines_received")
)
const applicationJSON = "application/json"
@ -130,6 +134,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
totalEntries++
entriesSize += int64(len(e.Line))
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line))))
bytesReceivedStats.Inc(int64(len(e.Line)))
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
@ -140,6 +145,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
if totalEntries != 0 && userID != "" {
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}
linesReceivedStats.Inc(totalEntries)
level.Debug(logger).Log(
"msg", "push request parsed",

@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -64,6 +65,11 @@ var (
Name: "logql_querystats_ingester_sent_lines_total",
Help: "Total count of lines sent from ingesters while executing LogQL queries.",
})
bytePerSecondMetricUsage = usagestats.NewStatistics("query_metric_bytes_per_second")
bytePerSecondLogUsage = usagestats.NewStatistics("query_log_bytes_per_second")
linePerSecondMetricUsage = usagestats.NewStatistics("query_metric_lines_per_second")
linePerSecondLogUsage = usagestats.NewStatistics("query_log_lines_per_second")
)
func RecordMetrics(ctx context.Context, p Params, status string, stats logql_stats.Result, result promql_parser.Value) {
@ -125,6 +131,18 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats logql_sta
chunkDownloadedTotal.WithLabelValues(status, queryType, rt).
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))
recordUsageStats(queryType, stats)
}
func recordUsageStats(queryType string, stats logql_stats.Result) {
if queryType == QueryTypeMetric {
bytePerSecondMetricUsage.Record(float64(stats.Summary.BytesProcessedPerSecond))
linePerSecondMetricUsage.Record(float64(stats.Summary.LinesProcessedPerSecond))
} else {
bytePerSecondLogUsage.Record(float64(stats.Summary.BytesProcessedPerSecond))
linePerSecondLogUsage.Record(float64(stats.Summary.LinesProcessedPerSecond))
}
}
func QueryType(query string) (string, error) {

@ -45,6 +45,7 @@ import (
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/tracing"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/fakeauth"
util_log "github.com/grafana/loki/pkg/util/log"
@ -79,6 +80,7 @@ type Config struct {
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
QueryScheduler scheduler.Config `yaml:"query_scheduler"`
UsageReport usagestats.Config `yaml:"usage_report"`
}
// RegisterFlags registers flag.
@ -115,6 +117,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Tracing.RegisterFlags(f)
c.CompactorConfig.RegisterFlags(f)
c.QueryScheduler.RegisterFlags(f)
c.UsageReport.RegisterFlags(f)
}
func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
@ -245,6 +248,7 @@ type Loki struct {
compactor *compactor.Compactor
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler
usageReport *usagestats.Reporter
clientMetrics chunk_storage.ClientMetrics
@ -481,6 +485,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(UsageReport, t.initUsageReport)
mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
@ -492,17 +497,17 @@ func (t *Loki) setupModuleManager() error {
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, Server},
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs},
Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport},
Store: {Overrides},
Ingester: {Store, Server, MemberlistKV, TenantConfigs},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware},
QueryScheduler: {Server, Overrides, MemberlistKV},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs},
TableManager: {Server},
Compactor: {Server, Overrides, MemberlistKV},
IndexGateway: {Server},
QueryFrontend: {QueryFrontendTripperware, UsageReport},
QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
TableManager: {Server, UsageReport},
Compactor: {Server, Overrides, MemberlistKV, UsageReport},
IndexGateway: {Server, UsageReport},
IngesterQuerier: {Ring},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor},

@ -18,6 +18,7 @@ import (
"github.com/grafana/dskit/runtimeconfig"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/weaveworks/common/middleware"
@ -49,6 +50,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
serverutil "github.com/grafana/loki/pkg/util/server"
@ -82,6 +84,7 @@ const (
All string = "all"
Read string = "read"
Write string = "write"
UsageReport string = "usage-report"
)
func (t *Loki) initServer() (services.Service, error) {
@ -749,6 +752,33 @@ func (t *Loki) initQueryScheduler() (services.Service, error) {
return s, nil
}
func (t *Loki) initUsageReport() (services.Service, error) {
if t.Cfg.UsageReport.Disabled {
return nil, nil
}
t.Cfg.UsageReport.Leader = false
if t.isModuleActive(Ingester) {
t.Cfg.UsageReport.Leader = true
}
usagestats.Edition("oss")
usagestats.Target(t.Cfg.Target.String())
period, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now())
if err != nil {
return nil, err
}
objectClient, err := chunk_storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig.Config, t.clientMetrics)
if err != nil {
return nil, err
}
ur, err := usagestats.NewReporter(t.Cfg.UsageReport, t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore, objectClient, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
t.usageReport = ur
return ur, nil
}
func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")

@ -427,7 +427,7 @@ func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if r.cfg.EnableSharding {
r.ring.ServeHTTP(w, req)
} else {
var unshardedPage = `
unshardedPage := `
<!DOCTYPE html>
<html>
<head>
@ -769,9 +769,9 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta
// Concurrently fetch rules from all rulers. Since rules are not replicated,
// we need all requests to succeed.
jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses())
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
addr := job.(string)
addresses := rulers.GetAddresses()
err = concurrency.ForEachJob(ctx, len(addresses), len(addresses), func(ctx context.Context, idx int) error {
addr := addresses[idx]
rulerClient, err := r.clientsPool.GetClientFor(addr)
if err != nil {

@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
)
@ -31,6 +32,9 @@ var (
errCurrentBoltdbShipperNon24Hours = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period")
errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h")
errZeroLengthConfig = errors.New("must specify at least one schema configuration")
indexTypeStats = usagestats.NewString("store_index_type")
objectTypeStats = usagestats.NewString("store_object_type")
schemaStats = usagestats.NewString("store_schema")
)
// Config is the loki storage configuration
@ -125,6 +129,14 @@ type store struct {
// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, schemaCfg SchemaConfig, chunkStore chunk.Store, registerer prometheus.Registerer) (Store, error) {
if len(schemaCfg.Configs) != 0 {
if index := ActivePeriodConfig(schemaCfg.Configs); index != -1 && index < len(schemaCfg.Configs) {
indexTypeStats.Set(schemaCfg.Configs[index].IndexType)
objectTypeStats.Set(schemaCfg.Configs[index].ObjectType)
schemaStats.Set(schemaCfg.Configs[index].Schema)
}
}
return &store{
Store: chunkStore,
cfg: cfg,

@ -26,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -52,6 +53,11 @@ const (
ringNumTokens = 1
)
var (
retentionEnabledStats = usagestats.NewString("compactor_retention_enabled")
defaultRetentionStats = usagestats.NewString("compactor_default_retention")
)
type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
@ -119,6 +125,13 @@ type Compactor struct {
}
func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, clientMetrics storage.ClientMetrics, r prometheus.Registerer) (*Compactor, error) {
retentionEnabledStats.Set("false")
if cfg.RetentionEnabled {
retentionEnabledStats.Set("true")
}
if limits != nil {
defaultRetentionStats.Set(limits.DefaultLimits().RetentionPeriod.String())
}
if cfg.SharedStoreType == "" {
return nil, errors.New("compactor shared_store_type must be specified")
}

@ -264,18 +264,13 @@ func (t *table) compactFiles(files []storage.IndexFile) error {
return err
}
jobs := make([]interface{}, len(files))
for i := 0; i < len(files); i++ {
jobs[i] = i
}
return concurrency.ForEach(t.ctx, jobs, readDBsConcurrency, func(ctx context.Context, job interface{}) error {
workNum := job.(int)
return concurrency.ForEachJob(t.ctx, len(files), readDBsConcurrency, func(ctx context.Context, idx int) error {
workNum := idx
// skip seed file
if workNum == t.seedSourceFileIdx {
return nil
}
fileName := files[workNum].Name
fileName := files[idx].Name
downloadAt := filepath.Join(t.workingDirectory, fileName)
err = shipper_util.DownloadFileFromStorage(downloadAt, shipper_util.IsCompressedFile(fileName),

@ -390,13 +390,8 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind
downloadedFiles := make([]string, 0, len(files))
downloadedFilesMtx := sync.Mutex{}
jobs := make([]interface{}, len(files))
for i := 0; i < len(files); i++ {
jobs[i] = i
}
err := concurrency.ForEach(ctx, jobs, maxDownloadConcurrency, func(ctx context.Context, job interface{}) error {
fileName := files[job.(int)].Name
err := concurrency.ForEachJob(ctx, len(files), maxDownloadConcurrency, func(ctx context.Context, idx int) error {
fileName := files[idx].Name
err := t.downloadFileFromStorage(ctx, fileName, t.cacheLocation)
if err != nil {
if t.baseIndexSet.IsFileNotFoundErr(err) {
@ -412,7 +407,6 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind
return nil
})
if err != nil {
return nil, err
}

@ -308,8 +308,8 @@ func (t *Table) EnsureQueryReadiness(ctx context.Context) error {
// downloadUserIndexes downloads user specific index files concurrently.
func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error {
return concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), maxDownloadConcurrency, func(ctx context.Context, userID interface{}) error {
indexSet, err := t.getOrCreateIndexSet(userID.(string))
return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error {
indexSet, err := t.getOrCreateIndexSet(userIDs[idx])
if err != nil {
return err
}

@ -0,0 +1,266 @@
package usagestats
import (
"bytes"
"context"
"flag"
"io"
"math"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/build"
)
const (
// File name for the cluster seed file.
ClusterSeedFileName = "loki_cluster_seed.json"
// attemptNumber how many times we will try to read a corrupted cluster seed before deleting it.
attemptNumber = 4
// seedKey is the key for the cluster seed to use with the kv store.
seedKey = "usagestats_token"
)
var (
reportCheckInterval = time.Minute
reportInterval = 1 * time.Hour
)
type Config struct {
Disabled bool `yaml:"disabled"`
Leader bool `yaml:"-"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Disabled, "usage-report.disabled", false, "Disable anonymous usage reporting.")
}
type Reporter struct {
kvClient kv.Client
logger log.Logger
objectClient chunk.ObjectClient
reg prometheus.Registerer
services.Service
conf Config
cluster *ClusterSeed
lastReport time.Time
}
func NewReporter(config Config, kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer) (*Reporter, error) {
if config.Disabled {
return nil, nil
}
kvClient, err := kv.NewClient(kvConfig, JSONCodec, kv.RegistererWithKVName(reg, "usagestats"), logger)
if err != nil {
return nil, err
}
r := &Reporter{
kvClient: kvClient,
logger: logger,
objectClient: objectClient,
conf: config,
reg: reg,
}
r.Service = services.NewBasicService(nil, r.running, nil)
return r, nil
}
func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
// Try to become leader via the kv client
for backoff := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: time.Minute,
MaxRetries: 0,
}); ; backoff.Ongoing() {
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
PrometheusVersion: build.GetVersion(),
CreatedAt: time.Now(),
}
if err := rep.kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
}
}
return seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
// Fetch the remote cluster seed.
remoteSeed, err := rep.fetchSeed(ctx,
func(err error) bool {
// we only want to retry if the error is not an object not found error
return !rep.objectClient.IsObjectNotFoundErr(err)
})
if err != nil {
if rep.objectClient.IsObjectNotFoundErr(err) {
// we are the leader and we need to save the file.
if err := rep.writeSeedFile(ctx, seed); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
return &seed
}
continue
}
return remoteSeed
}
}
func (rep *Reporter) init(ctx context.Context) {
if rep.conf.Leader {
rep.cluster = rep.initLeader(ctx)
return
}
// follower only wait for the cluster seed to be set.
// it will try forever to fetch the cluster seed.
seed, _ := rep.fetchSeed(ctx, nil)
rep.cluster = seed
}
// fetchSeed fetches the cluster seed from the object store and try until it succeeds.
// continueFn allow you to decide if we should continue retrying. Nil means always retry
func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) bool) (*ClusterSeed, error) {
var (
backoff = backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: time.Minute,
MaxRetries: 0,
})
readingErr = 0
)
for backoff.Ongoing() {
seed, err := rep.readSeedFile(ctx)
if err != nil {
if !rep.objectClient.IsObjectNotFoundErr(err) {
readingErr++
}
level.Debug(rep.logger).Log("msg", "failed to read cluster seed file", "err", err)
if readingErr > attemptNumber {
if err := rep.objectClient.DeleteObject(ctx, ClusterSeedFileName); err != nil {
level.Error(rep.logger).Log("msg", "failed to delete corrupted cluster seed file, deleting it", "err", err)
}
readingErr = 0
}
if continueFn == nil || continueFn(err) {
continue
}
return nil, err
}
return seed, nil
}
return nil, backoff.Err()
}
// readSeedFile reads the cluster seed file from the object store.
func (rep *Reporter) readSeedFile(ctx context.Context) (*ClusterSeed, error) {
reader, _, err := rep.objectClient.GetObject(ctx, ClusterSeedFileName)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
defer func() {
if err := reader.Close(); err != nil {
level.Error(rep.logger).Log("msg", "failed to close reader", "err", err)
}
}()
data, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
seed, err := JSONCodec.Decode(data)
if err != nil {
return nil, err
}
return seed.(*ClusterSeed), nil
}
// writeSeedFile writes the cluster seed to the object store.
func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error {
data, err := JSONCodec.Encode(seed)
if err != nil {
return err
}
return rep.objectClient.PutObject(ctx, ClusterSeedFileName, bytes.NewReader(data))
}
// running inits the reporter seed and start sending report for every interval
func (rep *Reporter) running(ctx context.Context) error {
rep.init(ctx)
// check every minute if we should report.
ticker := time.NewTicker(reportCheckInterval)
defer ticker.Stop()
// find when to send the next report.
next := nextReport(reportInterval, rep.cluster.CreatedAt, time.Now())
if rep.lastReport.IsZero() {
// if we never reported assumed it was the last interval.
rep.lastReport = next.Add(-reportInterval)
}
for {
select {
case <-ticker.C:
now := time.Now()
if !next.Equal(now) && now.Sub(rep.lastReport) < reportInterval {
continue
}
level.Debug(rep.logger).Log("msg", "reporting cluster stats", "date", time.Now())
if err := rep.reportUsage(ctx, next); err != nil {
level.Info(rep.logger).Log("msg", "failed to report usage", "err", err)
continue
}
rep.lastReport = next
next = next.Add(reportInterval)
case <-ctx.Done():
return ctx.Err()
}
}
}
// reportUsage reports the usage to grafana.com.
func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error {
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: 30 * time.Second,
MaxRetries: 5,
})
var errs multierror.MultiError
for backoff.Ongoing() {
if err := sendReport(ctx, rep.cluster, interval); err != nil {
level.Info(rep.logger).Log("msg", "failed to send usage report", "retries", backoff.NumRetries(), "err", err)
errs.Add(err)
backoff.Wait()
continue
}
level.Debug(rep.logger).Log("msg", "usage report sent with success")
return nil
}
return errs.Err()
}
// nextReport compute the next report time based on the interval.
// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time.
func nextReport(interval time.Duration, createdAt, now time.Time) time.Time {
// createdAt * (x * interval ) >= now
return createdAt.Add(time.Duration(math.Ceil(float64(now.Sub(createdAt))/float64(interval))) * interval)
}

@ -0,0 +1,147 @@
package usagestats
import (
"context"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/kv"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/storage"
)
var metrics = storage.NewClientMetrics()
func Test_LeaderElection(t *testing.T) {
result := make(chan *ClusterSeed, 10)
objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
Directory: t.TempDir(),
},
}, metrics)
require.NoError(t, err)
for i := 0; i < 3; i++ {
go func() {
r, err := NewReporter(Config{Leader: true}, kv.Config{
Store: "inmemory",
}, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)
r.init(context.Background())
result <- r.cluster
}()
}
for i := 0; i < 7; i++ {
go func() {
r, err := NewReporter(Config{Leader: false}, kv.Config{
Store: "inmemory",
}, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)
r.init(context.Background())
result <- r.cluster
}()
}
var UID []string
for i := 0; i < 10; i++ {
cluster := <-result
require.NotNil(t, cluster)
UID = append(UID, cluster.UID)
}
first := UID[0]
for _, uid := range UID {
require.Equal(t, first, uid)
}
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, JSONCodec, prometheus.DefaultRegisterer, log.NewLogfmtLogger(os.Stdout))
require.NoError(t, err)
// verify that the ID found is also correctly stored in the kv store and not overridden by another leader.
data, err := kvClient.Get(context.Background(), seedKey)
require.NoError(t, err)
require.Equal(t, data.(*ClusterSeed).UID, first)
}
func Test_ReportLoop(t *testing.T) {
// stub
reportCheckInterval = 100 * time.Millisecond
reportInterval = time.Second
totalReport := 0
clusterIDs := []string{}
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
var received Report
totalReport++
require.NoError(t, jsoniter.NewDecoder(r.Body).Decode(&received))
clusterIDs = append(clusterIDs, received.ClusterID)
rw.WriteHeader(http.StatusOK)
}))
usageStatsURL = server.URL
objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
Directory: t.TempDir(),
},
}, metrics)
require.NoError(t, err)
r, err := NewReporter(Config{Leader: true}, kv.Config{
Store: "inmemory",
}, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)
r.initLeader(context.Background())
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-time.After(6 * time.Second)
cancel()
}()
require.Equal(t, context.Canceled, r.running(ctx))
require.GreaterOrEqual(t, totalReport, 5)
first := clusterIDs[0]
for _, uid := range clusterIDs {
require.Equal(t, first, uid)
}
require.Equal(t, first, r.cluster.UID)
}
func Test_NextReport(t *testing.T) {
fixtures := map[string]struct {
interval time.Duration
createdAt time.Time
now time.Time
next time.Time
}{
"createdAt aligned with interval and now": {
interval: 1 * time.Hour,
createdAt: time.Unix(0, time.Hour.Nanoseconds()),
now: time.Unix(0, 2*time.Hour.Nanoseconds()),
next: time.Unix(0, 2*time.Hour.Nanoseconds()),
},
"createdAt aligned with interval": {
interval: 1 * time.Hour,
createdAt: time.Unix(0, time.Hour.Nanoseconds()),
now: time.Unix(0, 2*time.Hour.Nanoseconds()+1),
next: time.Unix(0, 3*time.Hour.Nanoseconds()),
},
"createdAt not aligned": {
interval: 1 * time.Hour,
createdAt: time.Unix(0, time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()),
now: time.Unix(0, 2*time.Hour.Nanoseconds()+1),
next: time.Unix(0, 2*time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()),
},
}
for name, f := range fixtures {
t.Run(name, func(t *testing.T) {
next := nextReport(f.interval, f.createdAt, f.now)
require.Equal(t, f.next, next)
})
}
}

@ -0,0 +1,33 @@
package usagestats
import (
"time"
jsoniter "github.com/json-iterator/go"
prom "github.com/prometheus/prometheus/web/api/v1"
)
type ClusterSeed struct {
UID string `json:"UID"`
CreatedAt time.Time `json:"created_at"`
prom.PrometheusVersion `json:"version"`
}
var JSONCodec = jsonCodec{}
type jsonCodec struct{}
// todo we need to use the default codec for the rest of the code
// currently crashing because the in-memory kvstore use a singleton.
func (jsonCodec) Decode(data []byte) (interface{}, error) {
var seed ClusterSeed
if err := jsoniter.ConfigFastest.Unmarshal(data, &seed); err != nil {
return nil, err
}
return &seed, nil
}
func (jsonCodec) Encode(obj interface{}) ([]byte, error) {
return jsoniter.ConfigFastest.Marshal(obj)
}
func (jsonCodec) CodecID() string { return "usagestats.jsonCodec" }

@ -0,0 +1,352 @@
package usagestats
import (
"bytes"
"context"
"encoding/json"
"expvar"
"fmt"
"io"
"math"
"net/http"
"runtime"
"strings"
"sync"
"time"
"github.com/grafana/loki/pkg/util/build"
"github.com/cespare/xxhash/v2"
jsoniter "github.com/json-iterator/go"
prom "github.com/prometheus/prometheus/web/api/v1"
"go.uber.org/atomic"
)
var (
httpClient = http.Client{Timeout: 5 * time.Second}
usageStatsURL = "https://stats.grafana.org/loki-usage-report"
statsPrefix = "github.com/grafana/loki/"
targetKey = "target"
editionKey = "edition"
)
// Report is the JSON object sent to the stats server
type Report struct {
ClusterID string `json:"clusterID"`
CreatedAt time.Time `json:"createdAt"`
Interval time.Time `json:"interval"`
Target string `json:"target"`
prom.PrometheusVersion `json:"version"`
Os string `json:"os"`
Arch string `json:"arch"`
Edition string `json:"edition"`
Metrics map[string]interface{} `json:"metrics"`
}
// sendReport sends the report to the stats server
func sendReport(ctx context.Context, seed *ClusterSeed, interval time.Time) error {
report := buildReport(seed, interval)
out, err := jsoniter.MarshalIndent(report, "", " ")
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, usageStatsURL, bytes.NewBuffer(out))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
data, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf("failed to send usage stats: %s body: %s", resp.Status, string(data))
}
return nil
}
// buildReport builds the report to be sent to the stats server
func buildReport(seed *ClusterSeed, interval time.Time) Report {
var (
targetName string
editionName string
)
if target := expvar.Get(statsPrefix + targetKey); target != nil {
if target, ok := target.(*expvar.String); ok {
targetName = target.Value()
}
}
if edition := expvar.Get(statsPrefix + editionKey); edition != nil {
if edition, ok := edition.(*expvar.String); ok {
editionName = edition.Value()
}
}
return Report{
ClusterID: seed.UID,
PrometheusVersion: build.GetVersion(),
CreatedAt: seed.CreatedAt,
Interval: interval,
Os: runtime.GOOS,
Arch: runtime.GOARCH,
Target: targetName,
Edition: editionName,
Metrics: buildMetrics(),
}
}
// buildMetrics builds the metrics part of the report to be sent to the stats server
func buildMetrics() map[string]interface{} {
result := map[string]interface{}{
"memstats": memstats(),
"num_cpu": runtime.NumCPU(),
"num_goroutine": runtime.NumGoroutine(),
}
expvar.Do(func(kv expvar.KeyValue) {
if !strings.HasPrefix(kv.Key, statsPrefix) || kv.Key == statsPrefix+targetKey || kv.Key == statsPrefix+editionKey {
return
}
var value interface{}
switch v := kv.Value.(type) {
case *expvar.Int:
value = v.Value()
case *expvar.Float:
value = v.Value()
case *expvar.String:
value = v.Value()
case *Statistics:
value = v.Value()
case *Counter:
v.updateRate()
value = v.Value()
v.reset()
case *WordCounter:
value = v.Value()
default:
value = v.String()
}
result[strings.TrimPrefix(kv.Key, statsPrefix)] = value
})
return result
}
func memstats() interface{} {
stats := new(runtime.MemStats)
runtime.ReadMemStats(stats)
return map[string]interface{}{
"alloc": stats.Alloc,
"total_alloc": stats.TotalAlloc,
"sys": stats.Sys,
"heap_alloc": stats.HeapAlloc,
"heap_inuse": stats.HeapInuse,
"stack_inuse": stats.StackInuse,
"pause_total_ns": stats.PauseTotalNs,
"num_gc": stats.NumGC,
"gc_cpu_fraction": stats.GCCPUFraction,
}
}
// NewFloat returns a new Float stats object.
func NewFloat(name string) *expvar.Float {
return expvar.NewFloat(statsPrefix + name)
}
// NewInt returns a new Int stats object.
func NewInt(name string) *expvar.Int {
return expvar.NewInt(statsPrefix + name)
}
// NewString returns a new String stats object.
func NewString(name string) *expvar.String {
return expvar.NewString(statsPrefix + name)
}
// Target sets the target name.
func Target(target string) {
NewString(targetKey).Set(target)
}
// Edition sets the edition name.
func Edition(edition string) {
NewString(editionKey).Set(edition)
}
type Statistics struct {
min *atomic.Float64
max *atomic.Float64
count *atomic.Int64
avg *atomic.Float64
// require for stddev and stdvar
mean *atomic.Float64
value *atomic.Float64
}
// NewStatistics returns a new Statistics object.
// Statistics object is thread-safe and compute statistics on the fly based on sample recorded.
// Available statistics are:
// - min
// - max
// - avg
// - count
// - stddev
// - stdvar
func NewStatistics(name string) *Statistics {
s := &Statistics{
min: atomic.NewFloat64(math.Inf(0)),
max: atomic.NewFloat64(math.Inf(-1)),
count: atomic.NewInt64(0),
avg: atomic.NewFloat64(0),
mean: atomic.NewFloat64(0),
value: atomic.NewFloat64(0),
}
expvar.Publish(statsPrefix+name, s)
return s
}
func (s *Statistics) String() string {
b, _ := json.Marshal(s.Value())
return string(b)
}
func (s *Statistics) Value() map[string]interface{} {
stdvar := s.value.Load() / float64(s.count.Load())
stddev := math.Sqrt(stdvar)
min := s.min.Load()
max := s.max.Load()
result := map[string]interface{}{
"avg": s.avg.Load(),
"count": s.count.Load(),
}
if !math.IsInf(min, 0) {
result["min"] = min
}
if !math.IsInf(max, 0) {
result["max"] = s.max.Load()
}
if !math.IsNaN(stddev) {
result["stddev"] = stddev
}
if !math.IsNaN(stdvar) {
result["stdvar"] = stdvar
}
return result
}
func (s *Statistics) Record(v float64) {
for {
min := s.min.Load()
if min <= v {
break
}
if s.min.CAS(min, v) {
break
}
}
for {
max := s.max.Load()
if max >= v {
break
}
if s.max.CAS(max, v) {
break
}
}
for {
avg := s.avg.Load()
count := s.count.Load()
mean := s.mean.Load()
value := s.value.Load()
delta := v - mean
newCount := count + 1
newMean := mean + (delta / float64(newCount))
newValue := value + (delta * (v - newMean))
newAvg := avg + ((v - avg) / float64(newCount))
if s.avg.CAS(avg, newAvg) && s.count.CAS(count, newCount) && s.mean.CAS(mean, newMean) && s.value.CAS(value, newValue) {
break
}
}
}
type Counter struct {
total *atomic.Int64
rate *atomic.Float64
resetTime time.Time
}
// NewCounter returns a new Counter stats object.
func NewCounter(name string) *Counter {
c := &Counter{
total: atomic.NewInt64(0),
rate: atomic.NewFloat64(0),
resetTime: time.Now(),
}
expvar.Publish(statsPrefix+name, c)
return c
}
func (c *Counter) updateRate() {
total := c.total.Load()
c.rate.Store(float64(total) / time.Since(c.resetTime).Seconds())
}
func (c *Counter) reset() {
c.total.Store(0)
c.rate.Store(0)
c.resetTime = time.Now()
}
func (c *Counter) Inc(i int64) {
c.total.Add(i)
}
func (c *Counter) String() string {
b, _ := json.Marshal(c.Value())
return string(b)
}
func (c *Counter) Value() map[string]interface{} {
return map[string]interface{}{
"total": c.total.Load(),
"rate": c.rate.Load(),
}
}
type WordCounter struct {
words sync.Map
count *atomic.Int64
}
// NewWordCounter returns a new WordCounter stats object.
// WordCounter object is thread-safe and count the amount of word recorded.
func NewWordCounter(name string) *WordCounter {
c := &WordCounter{
count: atomic.NewInt64(0),
words: sync.Map{},
}
expvar.Publish(statsPrefix+name, c)
return c
}
func (w *WordCounter) Add(word string) {
if _, loaded := w.words.LoadOrStore(xxhash.Sum64String(word), struct{}{}); !loaded {
w.count.Add(1)
}
}
func (w *WordCounter) String() string {
b, _ := json.Marshal(w.Value())
return string(b)
}
func (w *WordCounter) Value() int64 {
return w.count.Load()
}

@ -0,0 +1,97 @@
package usagestats
import (
"runtime"
"testing"
"time"
"github.com/grafana/loki/pkg/util/build"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/require"
)
func Test_BuildReport(t *testing.T) {
now := time.Now()
seed := &ClusterSeed{
UID: uuid.New().String(),
CreatedAt: now,
}
Edition("OSS")
Target("compactor")
NewString("compression").Set("lz4")
NewInt("compression_ratio").Set(100)
NewFloat("size_mb").Set(100.1)
NewCounter("lines_written").Inc(200)
s := NewStatistics("query_throughput")
s.Record(300)
s.Record(5)
w := NewWordCounter("active_tenants")
w.Add("foo")
w.Add("bar")
w.Add("foo")
r := buildReport(seed, now.Add(time.Hour))
require.Equal(t, r.Arch, runtime.GOARCH)
require.Equal(t, r.Os, runtime.GOOS)
require.Equal(t, r.PrometheusVersion, build.GetVersion())
require.Equal(t, r.Edition, "OSS")
require.Equal(t, r.Target, "compactor")
require.Equal(t, r.Metrics["num_cpu"], runtime.NumCPU())
require.Equal(t, r.Metrics["num_goroutine"], runtime.NumGoroutine())
require.Equal(t, r.Metrics["compression"], "lz4")
require.Equal(t, r.Metrics["compression_ratio"], int64(100))
require.Equal(t, r.Metrics["size_mb"], 100.1)
require.Equal(t, r.Metrics["lines_written"].(map[string]interface{})["total"], int64(200))
require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["min"], float64(5))
require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["max"], float64(300))
require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["count"], int64(2))
require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["avg"], float64(300+5)/2)
require.Equal(t, r.Metrics["active_tenants"], int64(2))
out, _ := jsoniter.MarshalIndent(r, "", " ")
t.Log(string(out))
}
func TestCounter(t *testing.T) {
c := NewCounter("test_counter")
c.Inc(100)
c.Inc(200)
c.Inc(300)
time.Sleep(1 * time.Second)
c.updateRate()
v := c.Value()
require.Equal(t, int64(600), v["total"])
require.GreaterOrEqual(t, v["rate"], float64(590))
c.reset()
require.Equal(t, int64(0), c.Value()["total"])
require.Equal(t, float64(0), c.Value()["rate"])
}
func TestStatistic(t *testing.T) {
s := NewStatistics("test_stats")
s.Record(100)
s.Record(200)
s.Record(300)
v := s.Value()
require.Equal(t, float64(100), v["min"])
require.Equal(t, float64(300), v["max"])
require.Equal(t, int64(3), v["count"])
require.Equal(t, float64(100+200+300)/3, v["avg"])
require.Equal(t, float64(81.64965809277261), v["stddev"])
require.Equal(t, float64(6666.666666666667), v["stdvar"])
}
func TestWordCounter(t *testing.T) {
w := NewWordCounter("test_words_count")
for i := 0; i < 100; i++ {
go func() {
w.Add("foo")
w.Add("bar")
w.Add("foo")
}()
}
require.Equal(t, int64(2), w.Value())
}

@ -4,6 +4,7 @@ import (
"runtime"
"github.com/prometheus/common/version"
prom "github.com/prometheus/prometheus/web/api/v1"
)
// Version information passed to Prometheus version package.
@ -27,3 +28,14 @@ func init() {
version.BuildDate = BuildDate
version.GoVersion = runtime.Version()
}
func GetVersion() prom.PrometheusVersion {
return prom.PrometheusVersion{
Version: version.Version,
Revision: version.Revision,
Branch: version.Branch,
BuildUser: version.BuildUser,
BuildDate: version.BuildDate,
GoVersion: version.GoVersion,
}
}

@ -41,7 +41,7 @@ func NotFoundHandler(w http.ResponseWriter, r *http.Request) {
func JSONError(w http.ResponseWriter, code int, message string, args ...interface{}) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(code)
json.NewEncoder(w).Encode(ErrorResponseBody{
_ = json.NewEncoder(w).Encode(ErrorResponseBody{
Code: code,
Status: "error",
Message: fmt.Sprintf(message, args...),

@ -32,58 +32,78 @@ func Test_writeError(t *testing.T) {
}{
{"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest},
{"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest},
{"rpc cancelled",
{
"rpc cancelled",
status.New(codes.Canceled, context.Canceled.Error()).Err(),
"rpc error: code = Canceled desc = context canceled",
http.StatusInternalServerError},
{"rpc cancelled multi",
http.StatusInternalServerError,
},
{
"rpc cancelled multi",
util.MultiError{status.New(codes.Canceled, context.Canceled.Error()).Err(), status.New(codes.Canceled, context.Canceled.Error()).Err()},
"2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled",
http.StatusInternalServerError},
{"mixed context and rpc cancelled",
http.StatusInternalServerError,
},
{
"mixed context and rpc cancelled",
util.MultiError{context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()},
"2 errors: context canceled; rpc error: code = Canceled desc = context canceled",
http.StatusInternalServerError},
{"mixed context, rpc cancelled and another",
http.StatusInternalServerError,
},
{
"mixed context, rpc cancelled and another",
util.MultiError{errors.New("standard error"), context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()},
"3 errors: standard error; context canceled; rpc error: code = Canceled desc = context canceled",
http.StatusInternalServerError},
http.StatusInternalServerError,
},
{"cancelled storage", promql.ErrStorage{Err: context.Canceled}, ErrClientCanceled, StatusClientClosedRequest},
{"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest},
{"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout},
{"deadline multi", util.MultiError{context.DeadlineExceeded, context.DeadlineExceeded}, ErrDeadlineExceeded, http.StatusGatewayTimeout},
{"rpc deadline", status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), ErrDeadlineExceeded, http.StatusGatewayTimeout},
{"rpc deadline multi",
util.MultiError{status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(),
status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()},
{
"rpc deadline multi",
util.MultiError{
status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(),
status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(),
},
ErrDeadlineExceeded,
http.StatusGatewayTimeout},
{"mixed context and rpc deadline",
http.StatusGatewayTimeout,
},
{
"mixed context and rpc deadline",
util.MultiError{context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()},
ErrDeadlineExceeded,
http.StatusGatewayTimeout},
{"mixed context, rpc deadline and another",
http.StatusGatewayTimeout,
},
{
"mixed context, rpc deadline and another",
util.MultiError{errors.New("standard error"), context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()},
"3 errors: standard error; context deadline exceeded; rpc error: code = DeadlineExceeded desc = context deadline exceeded",
http.StatusInternalServerError},
http.StatusInternalServerError,
},
{"parse error", logqlmodel.ParseError{}, "parse error : ", http.StatusBadRequest},
{"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), "foo", http.StatusBadRequest},
{"internal", errors.New("foo"), "foo", http.StatusInternalServerError},
{"query error", chunk.ErrQueryMustContainMetricName, chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest},
{"wrapped query error",
{
"wrapped query error",
fmt.Errorf("wrapped: %w", chunk.ErrQueryMustContainMetricName),
"wrapped: " + chunk.ErrQueryMustContainMetricName.Error(),
http.StatusBadRequest},
{"multi mixed",
http.StatusBadRequest,
},
{
"multi mixed",
util.MultiError{context.Canceled, context.DeadlineExceeded},
"2 errors: context canceled; context deadline exceeded",
http.StatusInternalServerError},
http.StatusInternalServerError,
},
} {
t.Run(tt.name, func(t *testing.T) {
rec := httptest.NewRecorder()
WriteError(tt.err, rec)
res := &ErrorResponseBody{}
json.NewDecoder(rec.Result().Body).Decode(res)
_ = json.NewDecoder(rec.Result().Body).Decode(res)
require.Equal(t, tt.expectedStatus, res.Code)
require.Equal(t, tt.expectedStatus, rec.Result().StatusCode)
require.Equal(t, tt.expectedMsg, res.Message)

@ -1,118 +0,0 @@
// Copyright 2021 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"bytes"
"database/sql/driver"
"encoding/json"
"fmt"
)
var jsonNull = []byte("null")
// NullUUID represents a UUID that may be null.
// NullUUID implements the SQL driver.Scanner interface so
// it can be used as a scan destination:
//
// var u uuid.NullUUID
// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&u)
// ...
// if u.Valid {
// // use u.UUID
// } else {
// // NULL value
// }
//
type NullUUID struct {
UUID UUID
Valid bool // Valid is true if UUID is not NULL
}
// Scan implements the SQL driver.Scanner interface.
func (nu *NullUUID) Scan(value interface{}) error {
if value == nil {
nu.UUID, nu.Valid = Nil, false
return nil
}
err := nu.UUID.Scan(value)
if err != nil {
nu.Valid = false
return err
}
nu.Valid = true
return nil
}
// Value implements the driver Valuer interface.
func (nu NullUUID) Value() (driver.Value, error) {
if !nu.Valid {
return nil, nil
}
// Delegate to UUID Value function
return nu.UUID.Value()
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (nu NullUUID) MarshalBinary() ([]byte, error) {
if nu.Valid {
return nu.UUID[:], nil
}
return []byte(nil), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (nu *NullUUID) UnmarshalBinary(data []byte) error {
if len(data) != 16 {
return fmt.Errorf("invalid UUID (got %d bytes)", len(data))
}
copy(nu.UUID[:], data)
nu.Valid = true
return nil
}
// MarshalText implements encoding.TextMarshaler.
func (nu NullUUID) MarshalText() ([]byte, error) {
if nu.Valid {
return nu.UUID.MarshalText()
}
return jsonNull, nil
}
// UnmarshalText implements encoding.TextUnmarshaler.
func (nu *NullUUID) UnmarshalText(data []byte) error {
id, err := ParseBytes(data)
if err != nil {
nu.Valid = false
return err
}
nu.UUID = id
nu.Valid = true
return nil
}
// MarshalJSON implements json.Marshaler.
func (nu NullUUID) MarshalJSON() ([]byte, error) {
if nu.Valid {
return json.Marshal(nu.UUID)
}
return jsonNull, nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (nu *NullUUID) UnmarshalJSON(data []byte) error {
if bytes.Equal(data, jsonNull) {
*nu = NullUUID{}
return nil // valid null UUID
}
err := json.Unmarshal(data, &nu.UUID)
nu.Valid = err == nil
return err
}

@ -12,7 +12,6 @@ import (
"fmt"
"io"
"strings"
"sync"
)
// A UUID is a 128 bit (16 byte) Universal Unique IDentifier as defined in RFC
@ -34,15 +33,7 @@ const (
Future // Reserved for future definition.
)
const randPoolSize = 16 * 16
var (
rander = rand.Reader // random function
poolEnabled = false
poolMu sync.Mutex
poolPos = randPoolSize // protected with poolMu
pool [randPoolSize]byte // protected with poolMu
)
var rander = rand.Reader // random function
type invalidLengthError struct{ len int }
@ -50,12 +41,6 @@ func (err invalidLengthError) Error() string {
return fmt.Sprintf("invalid UUID length: %d", err.len)
}
// IsInvalidLengthError is matcher function for custom error invalidLengthError
func IsInvalidLengthError(err error) bool {
_, ok := err.(invalidLengthError)
return ok
}
// Parse decodes s into a UUID or returns an error. Both the standard UUID
// forms of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and
// urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx are decoded as well as the
@ -264,31 +249,3 @@ func SetRand(r io.Reader) {
}
rander = r
}
// EnableRandPool enables internal randomness pool used for Random
// (Version 4) UUID generation. The pool contains random bytes read from
// the random number generator on demand in batches. Enabling the pool
// may improve the UUID generation throughput significantly.
//
// Since the pool is stored on the Go heap, this feature may be a bad fit
// for security sensitive applications.
//
// Both EnableRandPool and DisableRandPool are not thread-safe and should
// only be called when there is no possibility that New or any other
// UUID Version 4 generation function will be called concurrently.
func EnableRandPool() {
poolEnabled = true
}
// DisableRandPool disables the randomness pool if it was previously
// enabled with EnableRandPool.
//
// Both EnableRandPool and DisableRandPool are not thread-safe and should
// only be called when there is no possibility that New or any other
// UUID Version 4 generation function will be called concurrently.
func DisableRandPool() {
poolEnabled = false
defer poolMu.Unlock()
poolMu.Lock()
poolPos = randPoolSize
}

@ -27,8 +27,6 @@ func NewString() string {
// The strength of the UUIDs is based on the strength of the crypto/rand
// package.
//
// Uses the randomness pool if it was enabled with EnableRandPool.
//
// A note about uniqueness derived from the UUID Wikipedia entry:
//
// Randomly generated UUIDs have 122 random bits. One's annual risk of being
@ -37,10 +35,7 @@ func NewString() string {
// equivalent to the odds of creating a few tens of trillions of UUIDs in a
// year and having one duplicate.
func NewRandom() (UUID, error) {
if !poolEnabled {
return NewRandomFromReader(rander)
}
return newRandomFromPool()
return NewRandomFromReader(rander)
}
// NewRandomFromReader returns a UUID based on bytes read from a given io.Reader.
@ -54,23 +49,3 @@ func NewRandomFromReader(r io.Reader) (UUID, error) {
uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10
return uuid, nil
}
func newRandomFromPool() (UUID, error) {
var uuid UUID
poolMu.Lock()
if poolPos == randPoolSize {
_, err := io.ReadFull(rander, pool[:])
if err != nil {
poolMu.Unlock()
return Nil, err
}
poolPos = 0
}
copy(uuid[:], pool[poolPos:(poolPos+16)])
poolPos += 16
poolMu.Unlock()
uuid[6] = (uuid[6] & 0x0f) | 0x40 // Version 4
uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10
return uuid, nil
}

@ -10,9 +10,9 @@ import (
// Config configures a Backoff
type Config struct {
MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level
MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level
MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries
MinBackoff time.Duration `yaml:"min_period" category:"advanced"` // start backoff at this level
MaxBackoff time.Duration `yaml:"max_period" category:"advanced"` // increase exponentially to this level
MaxRetries int `yaml:"max_retries" category:"advanced"` // give up after this many; zero means infinite retries
}
// RegisterFlagsWithPrefix for Config.

@ -4,6 +4,7 @@ import (
"context"
"sync"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/grafana/dskit/internal/math"
@ -62,45 +63,53 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun
// ForEach runs the provided jobFunc for each job up to concurrency concurrent workers.
// The execution breaks on first error encountered.
//
// Deprecated: use ForEachJob instead.
func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error {
if len(jobs) == 0 {
return nil
return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error {
return jobFunc(ctx, jobs[idx])
})
}
// CreateJobsFromStrings is an utility to create jobs from an slice of strings.
//
// Deprecated: will be removed as it's not needed when using ForEachJob.
func CreateJobsFromStrings(values []string) []interface{} {
jobs := make([]interface{}, len(values))
for i := 0; i < len(values); i++ {
jobs[i] = values[i]
}
return jobs
}
// Push all jobs to a channel.
ch := make(chan interface{}, len(jobs))
for _, job := range jobs {
ch <- job
// ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers.
// The execution breaks on first error encountered.
func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error {
if jobs == 0 {
return nil
}
close(ch)
// Initialise indexes with -1 so first Inc() returns index 0.
indexes := atomic.NewInt64(-1)
// Start workers to process jobs.
g, ctx := errgroup.WithContext(ctx)
for ix := 0; ix < math.Min(concurrency, len(jobs)); ix++ {
for ix := 0; ix < math.Min(concurrency, jobs); ix++ {
g.Go(func() error {
for job := range ch {
if err := ctx.Err(); err != nil {
return err
for ctx.Err() == nil {
idx := int(indexes.Inc())
if idx >= jobs {
return nil
}
if err := jobFunc(ctx, job); err != nil {
if err := jobFunc(ctx, idx); err != nil {
return err
}
}
return nil
return ctx.Err()
})
}
// Wait until done (or context has canceled).
return g.Wait()
}
// CreateJobsFromStrings is an utility to create jobs from an slice of strings.
func CreateJobsFromStrings(values []string) []interface{} {
jobs := make([]interface{}, len(values))
for i := 0; i < len(values); i++ {
jobs[i] = values[i]
}
return jobs
}

@ -13,11 +13,11 @@ import (
// ClientConfig is the config for client TLS.
type ClientConfig struct {
CertPath string `yaml:"tls_cert_path"`
KeyPath string `yaml:"tls_key_path"`
CAPath string `yaml:"tls_ca_path"`
ServerName string `yaml:"tls_server_name"`
InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"`
CertPath string `yaml:"tls_cert_path" category:"advanced"`
KeyPath string `yaml:"tls_key_path" category:"advanced"`
CAPath string `yaml:"tls_ca_path" category:"advanced"`
ServerName string `yaml:"tls_server_name" category:"advanced"`
InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify" category:"advanced"`
}
var (

@ -18,16 +18,16 @@ import (
// Config for a gRPC client.
type Config struct {
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
GRPCCompression string `yaml:"grpc_compression"`
RateLimit float64 `yaml:"rate_limit"`
RateLimitBurst int `yaml:"rate_limit_burst"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"`
MaxSendMsgSize int `yaml:"max_send_msg_size" category:"advanced"`
GRPCCompression string `yaml:"grpc_compression" category:"advanced"`
RateLimit float64 `yaml:"rate_limit" category:"advanced"`
RateLimitBurst int `yaml:"rate_limit_burst" category:"advanced"`
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits" category:"advanced"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
TLSEnabled bool `yaml:"tls_enabled"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
}

@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels {
// The NewInMemoryKVClient returned by NewClient() is a singleton, so
// that distributors and ingesters started in the same process can
// find themselves.
var inmemoryStoreInit sync.Once
var inmemoryStore Client
var (
inmemoryStoreInit sync.Once
inmemoryStore *consul.Client
)
// StoreConfig is a configuration used for building single store client, either
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
@ -53,7 +55,7 @@ type StoreConfig struct {
// where store can be consul or inmemory.
type Config struct {
Store string `yaml:"store"`
Prefix string `yaml:"prefix"`
Prefix string `yaml:"prefix" category:"advanced"`
StoreConfig `yaml:",inline"`
Mock Client `yaml:"-"`
@ -76,8 +78,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f
if flagsPrefix == "" {
flagsPrefix = "ring."
}
// Allow clients to override default store by setting it before calling this method.
if cfg.Store == "" {
cfg.Store = "consul"
}
f.StringVar(&cfg.Prefix, flagsPrefix+"prefix", defaultPrefix, "The prefix for the keys in the store. Should end with a /.")
f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.")
f.StringVar(&cfg.Store, flagsPrefix+"store", cfg.Store, "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.")
}
// Client is a high-level client for key-value stores (such as Etcd and
@ -140,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
inmemoryStoreInit.Do(func() {
inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg)
})
client = inmemoryStore
// however we swap the codec so that we can encode different type of values.
client = inmemoryStore.WithCodec(codec)
case "memberlist":
kv, err := cfg.MemberlistKV()

@ -40,11 +40,11 @@ var (
// Config to create a ConsulClient
type Config struct {
Host string `yaml:"host"`
ACLToken string `yaml:"acl_token"`
HTTPClientTimeout time.Duration `yaml:"http_client_timeout"`
ConsistentReads bool `yaml:"consistent_reads"`
WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit
WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1
ACLToken string `yaml:"acl_token" category:"advanced"`
HTTPClientTimeout time.Duration `yaml:"http_client_timeout" category:"advanced"`
ConsistentReads bool `yaml:"consistent_reads" category:"advanced"`
WatchKeyRateLimit float64 `yaml:"watch_rate_limit" category:"advanced"` // Zero disables rate limit
WatchKeyBurstSize int `yaml:"watch_burst_size" category:"advanced"` // Burst when doing rate-limit, defaults to 1
// Used in tests only.
MaxCasRetries int `yaml:"-"`
@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
}
kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx))
// Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value,
// and next call to Get will block as expected. We handle missing value below.
if err != nil {
@ -397,3 +396,10 @@ func (c *Client) createRateLimiter() *rate.Limiter {
}
return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst)
}
// WithCodec Clones and changes the codec of the consul client.
func (c *Client) WithCodec(codec codec.Codec) *Client {
new := *c
new.codec = codec
return &new
}

@ -22,9 +22,9 @@ import (
// Config for a new etcd.Client.
type Config struct {
Endpoints []string `yaml:"endpoints"`
DialTimeout time.Duration `yaml:"dial_timeout"`
MaxRetries int `yaml:"max_retries"`
EnableTLS bool `yaml:"tls_enabled"`
DialTimeout time.Duration `yaml:"dial_timeout" category:"advanced"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
EnableTLS bool `yaml:"tls_enabled" category:"advanced"`
TLS dstls.ClientConfig `yaml:",inline"`
UserName string `yaml:"username"`

@ -124,16 +124,16 @@ func (c *Client) awaitKVRunningOrStopping(ctx context.Context) error {
// KVConfig is a config for memberlist.KV
type KVConfig struct {
// Memberlist options.
NodeName string `yaml:"node_name"`
RandomizeNodeName bool `yaml:"randomize_node_name"`
StreamTimeout time.Duration `yaml:"stream_timeout"`
RetransmitMult int `yaml:"retransmit_factor"`
PushPullInterval time.Duration `yaml:"pull_push_interval"`
GossipInterval time.Duration `yaml:"gossip_interval"`
GossipNodes int `yaml:"gossip_nodes"`
GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"`
DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"`
EnableCompression bool `yaml:"compression_enabled"`
NodeName string `yaml:"node_name" category:"advanced"`
RandomizeNodeName bool `yaml:"randomize_node_name" category:"advanced"`
StreamTimeout time.Duration `yaml:"stream_timeout" category:"advanced"`
RetransmitMult int `yaml:"retransmit_factor" category:"advanced"`
PushPullInterval time.Duration `yaml:"pull_push_interval" category:"advanced"`
GossipInterval time.Duration `yaml:"gossip_interval" category:"advanced"`
GossipNodes int `yaml:"gossip_nodes" category:"advanced"`
GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"`
DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"`
EnableCompression bool `yaml:"compression_enabled" category:"advanced"`
// ip:port to advertise other cluster members. Used for NAT traversal
AdvertiseAddr string `yaml:"advertise_addr"`
@ -141,20 +141,20 @@ type KVConfig struct {
// List of members to join
JoinMembers flagext.StringSlice `yaml:"join_members"`
MinJoinBackoff time.Duration `yaml:"min_join_backoff"`
MaxJoinBackoff time.Duration `yaml:"max_join_backoff"`
MaxJoinRetries int `yaml:"max_join_retries"`
MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"`
MaxJoinBackoff time.Duration `yaml:"max_join_backoff" category:"advanced"`
MaxJoinRetries int `yaml:"max_join_retries" category:"advanced"`
AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"`
RejoinInterval time.Duration `yaml:"rejoin_interval"`
RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"`
// Remove LEFT ingesters from ring after this timeout.
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"`
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"`
// Timeout used when leaving the memberlist cluster.
LeaveTimeout time.Duration `yaml:"leave_timeout"`
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"`
// How much space to use to keep received and sent messages in memory (for troubleshooting).
MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes"`
MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"`
TCPTransport TCPTransportConfig `yaml:",inline"`

@ -45,10 +45,10 @@ type TCPTransportConfig struct {
// Timeout used when making connections to other nodes to send packet.
// Zero = no timeout
PacketDialTimeout time.Duration `yaml:"packet_dial_timeout"`
PacketDialTimeout time.Duration `yaml:"packet_dial_timeout" category:"advanced"`
// Timeout for writing packet data. Zero = no timeout.
PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"`
PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"`
// Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on
TransportDebug bool `yaml:"-"`
@ -57,7 +57,7 @@ type TCPTransportConfig struct {
MetricsRegisterer prometheus.Registerer `yaml:"-"`
MetricsNamespace string `yaml:"-"`
TLSEnabled bool `yaml:"tls_enabled"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS dstls.ClientConfig `yaml:",inline"`
}

@ -16,11 +16,11 @@ import (
// MultiConfig is a configuration for MultiClient.
type MultiConfig struct {
Primary string `yaml:"primary"`
Secondary string `yaml:"secondary"`
Primary string `yaml:"primary" category:"advanced"`
Secondary string `yaml:"secondary" category:"advanced"`
MirrorEnabled bool `yaml:"mirror_enabled"`
MirrorTimeout time.Duration `yaml:"mirror_timeout"`
MirrorEnabled bool `yaml:"mirror_enabled" category:"advanced"`
MirrorTimeout time.Duration `yaml:"mirror_timeout" category:"advanced"`
// ConfigProvider returns channel with MultiRuntimeConfig updates.
ConfigProvider func() <-chan MultiRuntimeConfig `yaml:"-"`

@ -3,6 +3,7 @@ package ring
import (
"context"
"fmt"
"net/http"
"sort"
"sync"
"time"
@ -491,3 +492,20 @@ func (l *BasicLifecycler) run(fn func() error) error {
return <-errCh
}
}
func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return l.store.CAS(ctx, l.ringKey, f)
}
func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) {
obj, err := l.store.Get(ctx, l.ringKey)
if err != nil {
return nil, err
}
return GetOrCreateRingDesc(obj), nil
}
func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req)
}

@ -10,8 +10,6 @@ import (
"sort"
"strings"
"time"
"github.com/go-kit/log/level"
)
const pageContent = `
@ -90,19 +88,6 @@ func init() {
pageTemplate = template.Must(t.Parse(pageContent))
}
func (r *Ring) forget(ctx context.Context, id string) error {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}
ringDesc := in.(*Desc)
ringDesc.RemoveIngester(id)
return ringDesc, true, nil
}
return r.KVClient.CAS(ctx, r.key, unregister)
}
type ingesterDesc struct {
ID string `json:"id"`
State string `json:"state"`
@ -121,11 +106,33 @@ type httpResponse struct {
ShowTokens bool `json:"-"`
}
func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
type ringAccess interface {
casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error
getRing(context.Context) (*Desc, error)
}
type ringPageHandler struct {
r ringAccess
heartbeatPeriod time.Duration
}
func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler {
return &ringPageHandler{
r: r,
heartbeatPeriod: heartbeatPeriod,
}
}
func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
ingesterID := req.FormValue("forget")
if err := r.forget(req.Context(), ingesterID); err != nil {
level.Error(r.logger).Log("msg", "error forgetting instance", "err", err)
if err := h.forget(req.Context(), ingesterID); err != nil {
http.Error(
w,
fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(),
http.StatusInternalServerError,
)
return
}
// Implement PRG pattern to prevent double-POST and work with CSRF middleware.
@ -140,23 +147,26 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
r.mtx.RLock()
defer r.mtx.RUnlock()
ringDesc, err := h.r.getRing(req.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, ownedTokens := ringDesc.countTokens()
ingesterIDs := []string{}
for id := range r.ringDesc.Ingesters {
for id := range ringDesc.Ingesters {
ingesterIDs = append(ingesterIDs, id)
}
sort.Strings(ingesterIDs)
now := time.Now()
var ingesters []ingesterDesc
_, owned := r.countTokens()
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
ing := ringDesc.Ingesters[id]
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(&ing, Reporting, now) {
if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) {
state = unhealthy
}
@ -175,7 +185,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Tokens: ing.Tokens,
Zone: ing.Zone,
NumTokens: len(ing.Tokens),
Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100,
Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100,
})
}
@ -203,6 +213,19 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ
}
}
func (h *ringPageHandler) forget(ctx context.Context, id string) error {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}
ringDesc := in.(*Desc)
ringDesc.RemoveIngester(id)
return ringDesc, true, nil
}
return h.r.casRing(ctx, unregister)
}
// WriteJSONResponse writes some JSON as a HTTP response.
func writeJSONResponse(w http.ResponseWriter, v httpResponse) {
w.Header().Set("Content-Type", "application/json")

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"sort"
"sync"
@ -26,17 +27,20 @@ type LifecyclerConfig struct {
RingConfig Config `yaml:"ring"`
// Config for the ingester lifecycle control
NumTokens int `yaml:"num_tokens"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
ObservePeriod time.Duration `yaml:"observe_period"`
JoinAfter time.Duration `yaml:"join_after"`
MinReadyDuration time.Duration `yaml:"min_ready_duration"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
NumTokens int `yaml:"num_tokens" category:"advanced"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"`
ObservePeriod time.Duration `yaml:"observe_period" category:"advanced"`
JoinAfter time.Duration `yaml:"join_after" category:"advanced"`
MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"`
InfNames []string `yaml:"interface_names"`
// FinalSleep's default value can be overridden by
// setting it before calling RegisterFlags or RegisterFlagsWithPrefix.
FinalSleep time.Duration `yaml:"final_sleep" category:"advanced"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`
UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"`
ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health"`
UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"`
ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health" category:"advanced"`
// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address" doc:"hidden"`
@ -47,12 +51,14 @@ type LifecyclerConfig struct {
ListenPort int `yaml:"-"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet
// RegisterFlags adds the flags required to config this to the given FlagSet.
// The default values of some flags can be changed; see docs of LifecyclerConfig.
func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet.
// The default values of some flags can be changed; see docs of LifecyclerConfig.
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f)
@ -67,7 +73,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.")
f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.")
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive ring updates.")
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", cfg.FinalSleep, "Duration to sleep for before exiting, to ensure metrics are scraped.")
f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
hostname, err := os.Hostname()
@ -849,6 +855,23 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
time.Sleep(i.cfg.FinalSleep)
}
func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return i.KVStore.CAS(ctx, i.RingKey, f)
}
func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) {
obj, err := i.KVStore.Get(ctx, i.RingKey)
if err != nil {
return nil, err
}
return GetOrCreateRingDesc(obj), nil
}
func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req)
}
// unregister removes our entry from consul.
func (i *Lifecycler) unregister(ctx context.Context) error {
level.Debug(i.logger).Log("msg", "unregistering instance from ring", "ring", i.RingName)

@ -20,8 +20,9 @@ type ReplicationSet struct {
MaxUnavailableZones int
}
// Do function f in parallel for all replicas in the set, erroring is we exceed
// Do function f in parallel for all replicas in the set, erroring if we exceed
// MaxErrors and returning early otherwise.
// Return a slice of all results from f, or nil if an error occurred.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
type instanceResult struct {
res interface{}

@ -8,11 +8,13 @@ import (
"fmt"
"math"
"math/rand"
"net/http"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -513,27 +515,32 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}
// countTokens returns the number of tokens and tokens within the range for each instance.
// The ring read lock must be already taken when calling this function.
func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
owned := map[string]uint32{}
numTokens := map[string]uint32{}
for i, token := range r.ringTokens {
func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) {
var (
owned = map[string]uint32{}
numTokens = map[string]uint32{}
ringTokens = r.GetTokens()
ringInstanceByToken = r.getTokensInfo()
)
for i, token := range ringTokens {
var diff uint32
// Compute how many tokens are within the range.
if i+1 == len(r.ringTokens) {
diff = (math.MaxUint32 - token) + r.ringTokens[0]
if i+1 == len(ringTokens) {
diff = (math.MaxUint32 - token) + ringTokens[0]
} else {
diff = r.ringTokens[i+1] - token
diff = ringTokens[i+1] - token
}
info := r.ringInstanceByToken[token]
info := ringInstanceByToken[token]
numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1
owned[info.InstanceID] = owned[info.InstanceID] + diff
}
// Set to 0 the number of owned tokens by instances which don't have tokens yet.
for id := range r.ringDesc.Ingesters {
for id := range r.Ingesters {
if _, ok := owned[id]; !ok {
owned[id] = 0
numTokens[id] = 0
@ -582,7 +589,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
prevOwners := r.reportedOwners
r.reportedOwners = make(map[string]struct{})
numTokens, ownedRange := r.countTokens()
numTokens, ownedRange := r.ringDesc.countTokens()
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
@ -840,6 +847,23 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) {
}
}
func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return r.KVClient.CAS(ctx, r.key, f)
}
func (r *Ring) getRing(ctx context.Context) (*Desc, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
ringDesc := proto.Clone(r.ringDesc).(*Desc)
return ringDesc, nil
}
func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req)
}
// Operation describes which instances can be included in the replica set, based on their state.
//
// Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.

@ -26,7 +26,7 @@ type Loader func(r io.Reader) (interface{}, error)
// Config holds the config for an Manager instance.
// It holds config related to loading per-tenant config.
type Config struct {
ReloadPeriod time.Duration `yaml:"period"`
ReloadPeriod time.Duration `yaml:"period" category:"advanced"`
// LoadPath contains the path to the runtime config file, requires an
// non-empty value
LoadPath string `yaml:"file"`

@ -590,7 +590,7 @@ github.com/google/pprof/profile
# github.com/google/renameio/v2 v2.0.0
## explicit; go 1.13
github.com/google/renameio/v2
# github.com/google/uuid v1.3.0
# github.com/google/uuid v1.2.0
## explicit
github.com/google/uuid
# github.com/googleapis/gax-go/v2 v2.1.1
@ -623,7 +623,7 @@ github.com/gorilla/mux
# github.com/gorilla/websocket v1.4.2
## explicit; go 1.12
github.com/gorilla/websocket
# github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5
# github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0
## explicit; go 1.16
github.com/grafana/dskit/backoff
github.com/grafana/dskit/concurrency

Loading…
Cancel
Save