mirror of https://github.com/grafana/loki
Migration to dskit/ring (#4641)
parent
2ac409c23e
commit
2d24e2ea64
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
2
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go
generated
vendored
2
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go
generated
vendored
4
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go
generated
vendored
4
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go
generated
vendored
2
vendor/github.com/cortexproject/cortex/pkg/querier/worker/scheduler_processor.go
generated
vendored
2
vendor/github.com/cortexproject/cortex/pkg/querier/worker/scheduler_processor.go
generated
vendored
@ -1,91 +1,15 @@ |
||||
package log |
||||
|
||||
import ( |
||||
"encoding" |
||||
"encoding/json" |
||||
"fmt" |
||||
"io" |
||||
"reflect" |
||||
) |
||||
|
||||
type jsonLogger struct { |
||||
io.Writer |
||||
} |
||||
"github.com/go-kit/log" |
||||
) |
||||
|
||||
// NewJSONLogger returns a Logger that encodes keyvals to the Writer as a
|
||||
// single JSON object. Each log event produces no more than one call to
|
||||
// w.Write. The passed Writer must be safe for concurrent use by multiple
|
||||
// goroutines if the returned Logger will be used concurrently.
|
||||
func NewJSONLogger(w io.Writer) Logger { |
||||
return &jsonLogger{w} |
||||
} |
||||
|
||||
func (l *jsonLogger) Log(keyvals ...interface{}) error { |
||||
n := (len(keyvals) + 1) / 2 // +1 to handle case when len is odd
|
||||
m := make(map[string]interface{}, n) |
||||
for i := 0; i < len(keyvals); i += 2 { |
||||
k := keyvals[i] |
||||
var v interface{} = ErrMissingValue |
||||
if i+1 < len(keyvals) { |
||||
v = keyvals[i+1] |
||||
} |
||||
merge(m, k, v) |
||||
} |
||||
enc := json.NewEncoder(l.Writer) |
||||
enc.SetEscapeHTML(false) |
||||
return enc.Encode(m) |
||||
} |
||||
|
||||
func merge(dst map[string]interface{}, k, v interface{}) { |
||||
var key string |
||||
switch x := k.(type) { |
||||
case string: |
||||
key = x |
||||
case fmt.Stringer: |
||||
key = safeString(x) |
||||
default: |
||||
key = fmt.Sprint(x) |
||||
} |
||||
|
||||
// We want json.Marshaler and encoding.TextMarshaller to take priority over
|
||||
// err.Error() and v.String(). But json.Marshall (called later) does that by
|
||||
// default so we force a no-op if it's one of those 2 case.
|
||||
switch x := v.(type) { |
||||
case json.Marshaler: |
||||
case encoding.TextMarshaler: |
||||
case error: |
||||
v = safeError(x) |
||||
case fmt.Stringer: |
||||
v = safeString(x) |
||||
} |
||||
|
||||
dst[key] = v |
||||
} |
||||
|
||||
func safeString(str fmt.Stringer) (s string) { |
||||
defer func() { |
||||
if panicVal := recover(); panicVal != nil { |
||||
if v := reflect.ValueOf(str); v.Kind() == reflect.Ptr && v.IsNil() { |
||||
s = "NULL" |
||||
} else { |
||||
panic(panicVal) |
||||
} |
||||
} |
||||
}() |
||||
s = str.String() |
||||
return |
||||
} |
||||
|
||||
func safeError(err error) (s interface{}) { |
||||
defer func() { |
||||
if panicVal := recover(); panicVal != nil { |
||||
if v := reflect.ValueOf(err); v.Kind() == reflect.Ptr && v.IsNil() { |
||||
s = nil |
||||
} else { |
||||
panic(panicVal) |
||||
} |
||||
} |
||||
}() |
||||
s = err.Error() |
||||
return |
||||
return log.NewJSONLogger(w) |
||||
} |
||||
|
||||
@ -1,62 +1,15 @@ |
||||
package log |
||||
|
||||
import ( |
||||
"bytes" |
||||
"io" |
||||
"sync" |
||||
|
||||
"github.com/go-logfmt/logfmt" |
||||
"github.com/go-kit/log" |
||||
) |
||||
|
||||
type logfmtEncoder struct { |
||||
*logfmt.Encoder |
||||
buf bytes.Buffer |
||||
} |
||||
|
||||
func (l *logfmtEncoder) Reset() { |
||||
l.Encoder.Reset() |
||||
l.buf.Reset() |
||||
} |
||||
|
||||
var logfmtEncoderPool = sync.Pool{ |
||||
New: func() interface{} { |
||||
var enc logfmtEncoder |
||||
enc.Encoder = logfmt.NewEncoder(&enc.buf) |
||||
return &enc |
||||
}, |
||||
} |
||||
|
||||
type logfmtLogger struct { |
||||
w io.Writer |
||||
} |
||||
|
||||
// NewLogfmtLogger returns a logger that encodes keyvals to the Writer in
|
||||
// logfmt format. Each log event produces no more than one call to w.Write.
|
||||
// The passed Writer must be safe for concurrent use by multiple goroutines if
|
||||
// the returned Logger will be used concurrently.
|
||||
func NewLogfmtLogger(w io.Writer) Logger { |
||||
return &logfmtLogger{w} |
||||
} |
||||
|
||||
func (l logfmtLogger) Log(keyvals ...interface{}) error { |
||||
enc := logfmtEncoderPool.Get().(*logfmtEncoder) |
||||
enc.Reset() |
||||
defer logfmtEncoderPool.Put(enc) |
||||
|
||||
if err := enc.EncodeKeyvals(keyvals...); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Add newline to the end of the buffer
|
||||
if err := enc.EndRecord(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// The Logger interface requires implementations to be safe for concurrent
|
||||
// use by multiple goroutines. For this implementation that means making
|
||||
// only one call to l.w.Write() for each call to Log.
|
||||
if _, err := l.w.Write(enc.buf.Bytes()); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
return log.NewLogfmtLogger(w) |
||||
} |
||||
|
||||
@ -1,8 +1,8 @@ |
||||
package log |
||||
|
||||
type nopLogger struct{} |
||||
import "github.com/go-kit/log" |
||||
|
||||
// NewNopLogger returns a logger that doesn't do anything.
|
||||
func NewNopLogger() Logger { return nopLogger{} } |
||||
|
||||
func (nopLogger) Log(...interface{}) error { return nil } |
||||
func NewNopLogger() Logger { |
||||
return log.NewNopLogger() |
||||
} |
||||
|
||||
@ -0,0 +1,40 @@ |
||||
package ring |
||||
|
||||
import ( |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
) |
||||
|
||||
type LifecyclerMetrics struct { |
||||
consulHeartbeats prometheus.Counter |
||||
tokensOwned prometheus.Gauge |
||||
tokensToOwn prometheus.Gauge |
||||
shutdownDuration *prometheus.HistogramVec |
||||
} |
||||
|
||||
func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics { |
||||
return &LifecyclerMetrics{ |
||||
consulHeartbeats: promauto.With(reg).NewCounter(prometheus.CounterOpts{ |
||||
Name: "member_consul_heartbeats_total", |
||||
Help: "The total number of heartbeats sent to consul.", |
||||
ConstLabels: prometheus.Labels{"name": ringName}, |
||||
}), |
||||
tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ |
||||
Name: "member_ring_tokens_owned", |
||||
Help: "The number of tokens owned in the ring.", |
||||
ConstLabels: prometheus.Labels{"name": ringName}, |
||||
}), |
||||
tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ |
||||
Name: "member_ring_tokens_to_own", |
||||
Help: "The number of tokens to own in the ring.", |
||||
ConstLabels: prometheus.Labels{"name": ringName}, |
||||
}), |
||||
shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ |
||||
Name: "shutdown_duration_seconds", |
||||
Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).", |
||||
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
|
||||
ConstLabels: prometheus.Labels{"name": ringName}, |
||||
}, []string{"op", "status"}), |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,45 @@ |
||||
package shard |
||||
|
||||
import ( |
||||
"crypto/md5" |
||||
"encoding/binary" |
||||
"math" |
||||
"unsafe" |
||||
) |
||||
|
||||
var ( |
||||
seedSeparator = []byte{0} |
||||
) |
||||
|
||||
// ShuffleShardSeed returns seed for random number generator, computed from provided identifier.
|
||||
func ShuffleShardSeed(identifier, zone string) int64 { |
||||
// Use the identifier to compute a hash we'll use to seed the random.
|
||||
hasher := md5.New() |
||||
hasher.Write(yoloBuf(identifier)) // nolint:errcheck
|
||||
if zone != "" { |
||||
hasher.Write(seedSeparator) // nolint:errcheck
|
||||
hasher.Write(yoloBuf(zone)) // nolint:errcheck
|
||||
} |
||||
checksum := hasher.Sum(nil) |
||||
|
||||
// Generate the seed based on the first 64 bits of the checksum.
|
||||
return int64(binary.BigEndian.Uint64(checksum)) |
||||
} |
||||
|
||||
// ShuffleShardExpectedInstancesPerZone returns the number of instances that should be selected for each
|
||||
// zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible
|
||||
// by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up.
|
||||
func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int { |
||||
return int(math.Ceil(float64(shardSize) / float64(numZones))) |
||||
} |
||||
|
||||
// ShuffleShardExpectedInstances returns the total number of instances that should be selected for a given
|
||||
// tenant. If zone-aware replication is disabled, the input numZones should be 1.
|
||||
func ShuffleShardExpectedInstances(shardSize, numZones int) int { |
||||
return ShuffleShardExpectedInstancesPerZone(shardSize, numZones) * numZones |
||||
} |
||||
|
||||
// yoloBuf will return an unsafe pointer to a string, as the name yolo.yoloBuf implies use at your own risk.
|
||||
func yoloBuf(s string) []byte { |
||||
return *((*[]byte)(unsafe.Pointer(&s))) |
||||
} |
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue