Fixes memberlist usage report (#5369)

* Fixes memberlist usage report

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes the linter and improve comment

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/5372/head
Cyril Tovena 4 years ago committed by GitHub
parent b19ce6230a
commit d44b6892eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      pkg/loki/loki.go
  2. 1
      pkg/loki/modules.go
  3. 54
      pkg/usagestats/reporter.go
  4. 8
      pkg/usagestats/reporter_test.go
  5. 51
      pkg/usagestats/seed.go
  6. 103
      pkg/usagestats/seed_test.go
  7. 2
      pkg/usagestats/stats.go

@ -494,6 +494,7 @@ func (t *Loki) setupModuleManager() error {
// Add dependencies
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
UsageReport: {},
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, Server},
TenantConfigs: {RuntimeConfig},
@ -540,6 +541,12 @@ func (t *Loki) setupModuleManager() error {
t.deps = deps
t.ModuleManager = mm
if t.isModuleActive(Ingester) {
if err := mm.AddDependency(UsageReport, Ring); err != nil {
return err
}
}
return nil
}

@ -674,6 +674,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
usagestats.JSONCodec,
}
dnsProviderReg := prometheus.WrapRegistererWithPrefix(

@ -32,7 +32,10 @@ const (
var (
reportCheckInterval = time.Minute
reportInterval = 1 * time.Hour
reportInterval = 4 * time.Hour
stabilityCheckInterval = 5 * time.Second
stabilityMinimunRequired = 6
)
type Config struct {
@ -80,11 +83,12 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
return nil
}
// Try to become leader via the kv client
for backoff := backoff.New(ctx, backoff.Config{
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: time.Minute,
MaxRetries: 0,
}); ; backoff.Ongoing() {
})
for backoff.Ongoing() {
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
@ -94,16 +98,19 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
}
}
return seed, true, nil
return &seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
// ensure stability of the cluster seed
stableSeed := ensureStableKey(ctx, kvClient, rep.logger)
seed = *stableSeed
// Fetch the remote cluster seed.
remoteSeed, err := rep.fetchSeed(ctx,
func(err error) bool {
@ -115,14 +122,50 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
// we are the leader and we need to save the file.
if err := rep.writeSeedFile(ctx, seed); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
backoff.Wait()
continue
}
return &seed
}
backoff.Wait()
continue
}
return remoteSeed
}
return nil
}
// ensureStableKey ensures that the cluster seed is stable for at least 30seconds.
// This is required when using gossiping kv client like memberlist which will never have the same seed
// but will converge eventually.
func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger) *ClusterSeed {
var (
previous *ClusterSeed
stableCount int
)
for {
time.Sleep(stabilityCheckInterval)
value, err := kvClient.Get(ctx, seedKey)
if err != nil {
level.Debug(logger).Log("msg", "failed to get cluster seed key for stability check", "err", err)
continue
}
if seed, ok := value.(*ClusterSeed); ok && seed != nil {
if previous == nil {
previous = seed
continue
}
if previous.UID != seed.UID {
previous = seed
stableCount = 0
continue
}
stableCount++
if stableCount > stabilityMinimunRequired {
return seed
}
}
}
}
func (rep *Reporter) init(ctx context.Context) {
@ -161,6 +204,7 @@ func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) b
readingErr = 0
}
if continueFn == nil || continueFn(err) {
backoff.Wait()
continue
}
return nil, err

@ -21,6 +21,8 @@ import (
var metrics = storage.NewClientMetrics()
func Test_LeaderElection(t *testing.T) {
stabilityCheckInterval = 100 * time.Millisecond
result := make(chan *ClusterSeed, 10)
objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
@ -71,6 +73,7 @@ func Test_ReportLoop(t *testing.T) {
// stub
reportCheckInterval = 100 * time.Millisecond
reportInterval = time.Second
stabilityCheckInterval = 100 * time.Millisecond
totalReport := 0
clusterIDs := []string{}
@ -94,12 +97,11 @@ func Test_ReportLoop(t *testing.T) {
Store: "inmemory",
}, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)
r.initLeader(context.Background())
ctx, cancel := context.WithCancel(context.Background())
r.initLeader(ctx)
go func() {
<-time.After(6 * time.Second)
<-time.After(6*time.Second + (stabilityCheckInterval * time.Duration(stabilityMinimunRequired+1)))
cancel()
}()
require.Equal(t, context.Canceled, r.running(ctx))

@ -1,18 +1,69 @@
package usagestats
import (
"fmt"
"time"
jsoniter "github.com/json-iterator/go"
prom "github.com/prometheus/prometheus/web/api/v1"
"github.com/grafana/dskit/kv/memberlist"
)
// ClusterSeed is the seed for the usage stats.
// A unique ID is generated for each cluster.
type ClusterSeed struct {
UID string `json:"UID"`
CreatedAt time.Time `json:"created_at"`
prom.PrometheusVersion `json:"version"`
}
// Merge implements the memberlist.Mergeable interface.
// It allow to merge the content of two different seeds.
func (c *ClusterSeed) Merge(mergeable memberlist.Mergeable, localCAS bool) (change memberlist.Mergeable, error error) {
if mergeable == nil {
return nil, nil
}
other, ok := mergeable.(*ClusterSeed)
if !ok {
return nil, fmt.Errorf("expected *usagestats.ClusterSeed, got %T", mergeable)
}
if other == nil {
return nil, nil
}
// if we already have (c) the oldest key, then should not request change.
if c.CreatedAt.Before(other.CreatedAt) {
return nil, nil
}
if c.CreatedAt == other.CreatedAt {
// if we have the exact same creation date but the key is different
// we take the smallest UID using string alphabetical comparison to ensure stability.
if c.UID > other.UID {
*c = *other
return other, nil
}
return nil, nil
}
// if our seed is not the oldest, then we should request a change.
*c = *other
return other, nil
}
// MergeContent tells if the content of the two seeds are the same.
func (c *ClusterSeed) MergeContent() []string {
return []string{c.UID}
}
// RemoveTombstones is not required for usagestats
func (c *ClusterSeed) RemoveTombstones(limit time.Time) (total, removed int) {
return 0, 0
}
func (c *ClusterSeed) Clone() memberlist.Mergeable {
new := *c
return &new
}
var JSONCodec = jsonCodec{}
type jsonCodec struct{}

@ -0,0 +1,103 @@
package usagestats
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/storage"
)
type dnsProviderMock struct {
resolved []string
}
func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error {
p.resolved = addrs
return nil
}
func (p dnsProviderMock) Addresses() []string {
return p.resolved
}
func createMemberlist(t *testing.T, port, memberID int) *memberlist.KV {
t.Helper()
var cfg memberlist.KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = memberlist.TCPTransportConfig{
BindAddrs: []string{"localhost"},
BindPort: 0,
}
cfg.GossipInterval = 100 * time.Millisecond
cfg.GossipNodes = 3
cfg.PushPullInterval = 5 * time.Second
cfg.NodeName = fmt.Sprintf("Member-%d", memberID)
cfg.Codecs = []codec.Codec{JSONCodec}
mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
if port != 0 {
_, err := mkv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", port)})
require.NoError(t, err, "%s failed to join the cluster: %v", memberID, err)
}
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.TODO(), mkv)
})
return mkv
}
func Test_Memberlist(t *testing.T) {
stabilityCheckInterval = time.Second
objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{
FSConfig: local.FSConfig{
Directory: t.TempDir(),
},
}, metrics)
require.NoError(t, err)
result := make(chan *ClusterSeed, 10)
// create a first memberlist to get a valid listening port.
initMKV := createMemberlist(t, 0, -1)
for i := 0; i < 10; i++ {
go func(i int) {
leader, err := NewReporter(Config{
Leader: true,
}, kv.Config{
Store: "memberlist",
StoreConfig: kv.StoreConfig{
MemberlistKV: func() (*memberlist.KV, error) {
return createMemberlist(t, initMKV.GetListeningPort(), i), nil
},
},
}, objectClient, log.NewLogfmtLogger(os.Stdout), nil)
require.NoError(t, err)
leader.init(context.Background())
result <- leader.cluster
}(i)
}
var UID []string
for i := 0; i < 10; i++ {
cluster := <-result
require.NotNil(t, cluster)
UID = append(UID, cluster.UID)
}
first := UID[0]
for _, uid := range UID {
require.Equal(t, first, uid)
}
}

@ -35,6 +35,7 @@ type Report struct {
ClusterID string `json:"clusterID"`
CreatedAt time.Time `json:"createdAt"`
Interval time.Time `json:"interval"`
IntervalPeriod float64 `json:"intervalPeriod"`
Target string `json:"target"`
prom.PrometheusVersion `json:"version"`
Os string `json:"os"`
@ -92,6 +93,7 @@ func buildReport(seed *ClusterSeed, interval time.Time) Report {
PrometheusVersion: build.GetVersion(),
CreatedAt: seed.CreatedAt,
Interval: interval,
IntervalPeriod: reportInterval.Seconds(),
Os: runtime.GOOS,
Arch: runtime.GOARCH,
Target: targetName,

Loading…
Cancel
Save