limits: limits implementation for loki (#948)

* limits implementation for loki

Added following new limits:

Length of query
Number of active streams
Number of streams matcher per query
Ingestion rate

* periodically reload ingestion rate limits to apply latest rates from overrides
pull/1043/head
Sandeep Sukhani 6 years ago committed by GitHub
parent b30c0d31bf
commit ec5bc7054d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      Gopkg.lock
  2. 3
      cmd/loki/main.go
  3. 98
      pkg/distributor/distributor.go
  4. 55
      pkg/distributor/distributor_test.go
  5. 13
      pkg/ingester/flush_test.go
  6. 9
      pkg/ingester/ingester.go
  7. 62
      pkg/ingester/ingester_test.go
  8. 40
      pkg/ingester/instance.go
  9. 10
      pkg/loki/loki.go
  10. 18
      pkg/loki/modules.go
  11. 80
      pkg/querier/querier.go
  12. 80
      pkg/querier/querier_test.go
  13. 10
      pkg/storage/hack/main.go
  14. 9
      pkg/storage/store.go
  15. 8
      pkg/storage/store_test.go
  16. 226
      pkg/util/validation/limits.go
  17. 24
      pkg/util/validation/validate.go
  18. 4
      vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go
  19. 11
      vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go
  20. 3
      vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go
  21. 5
      vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go
  22. 10
      vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go
  23. 10
      vendor/github.com/cortexproject/cortex/pkg/ring/http.go
  24. 60
      vendor/github.com/cortexproject/cortex/pkg/util/backoff.go
  25. 7
      vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go
  26. 85
      vendor/github.com/prometheus/procfs/arp.go
  27. 131
      vendor/github.com/prometheus/procfs/crypto.go
  28. 1578
      vendor/github.com/prometheus/procfs/fixtures.ttar
  29. 88
      vendor/github.com/prometheus/procfs/internal/util/parse.go
  30. 45
      vendor/github.com/prometheus/procfs/internal/util/sysreadfile.go
  31. 26
      vendor/github.com/prometheus/procfs/internal/util/sysreadfile_compat.go
  32. 77
      vendor/github.com/prometheus/procfs/internal/util/valueparser.go
  33. 91
      vendor/github.com/prometheus/procfs/net_softnet.go
  34. 30
      vendor/github.com/prometheus/procfs/proc.go
  35. 132
      vendor/github.com/prometheus/procfs/proc_fdinfo.go
  36. 118
      vendor/github.com/prometheus/procfs/schedstat.go
  37. 210
      vendor/github.com/prometheus/procfs/vm.go
  38. 196
      vendor/github.com/prometheus/procfs/zoneinfo.go
  39. 12
      vendor/go.etcd.io/etcd/raft/confchange/confchange.go
  40. 6
      vendor/go.etcd.io/etcd/raft/log_unstable.go
  41. 46
      vendor/go.etcd.io/etcd/raft/raft.go
  42. 4
      vendor/go.etcd.io/etcd/raft/util.go
  43. 2
      vendor/go.etcd.io/etcd/version/version.go

21
Gopkg.lock generated

@ -245,7 +245,7 @@
[[projects]]
branch = "master"
digest = "1:fdc932ff5ac5519eb816057507cb79a6addb23c722c03cfeec05aed44b53c96f"
digest = "1:573b3a3fbdc3f803b40d99f7a917db72b96ef00f5da75641cbfb01e9e5eec1c9"
name = "github.com/cortexproject/cortex"
packages = [
"pkg/chunk",
@ -275,7 +275,7 @@
"pkg/util/validation",
]
pruneopts = "UT"
revision = "934998160dbec7322c1ddfd70342a7aca68177f2"
revision = "68603f14e3e4cdbc0b90ff5af4f544f2c6e457fa"
[[projects]]
digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec"
@ -974,16 +974,15 @@
revision = "2998b132700a7d019ff618c06a234b47c1f3f681"
[[projects]]
digest = "1:8232537905152d6a0b116b9af5a0868fcac0e84eb02ec5a150624c077bdedb0b"
digest = "1:366f5aa02ff6c1e2eccce9ca03a22a6d983da89eecff8a89965401764534eb7c"
name = "github.com/prometheus/procfs"
packages = [
".",
"internal/fs",
"internal/util",
]
pruneopts = "UT"
revision = "00ec24a6a2d86e7074629c8384715dbb05adccd8"
version = "v0.0.4"
revision = "3f98efb27840a48a7a2898ec80be07674d19f9c8"
version = "v0.0.3"
[[projects]]
branch = "master"
@ -1222,7 +1221,7 @@
version = "v1.3.3"
[[projects]]
digest = "1:9a09f4299b5d5546c45b892c8f8ec733d7e7d4d113b4f6aa620f6b9ac5dede6b"
digest = "1:6b279e7b04444101c8878335e7671597f76ae87ba767405a45d0dec949b97344"
name = "go.etcd.io/etcd"
packages = [
"auth",
@ -1303,8 +1302,8 @@
"wal/walpb",
]
pruneopts = "UT"
revision = "67d0c21bb04c19ef1c76c63549c776bde3d3ee90"
version = "v3.4.0-rc.2"
revision = "8f85f0dc2607fcfb3ec8d812ecf52eb391fb2f74"
version = "v3.4.0-rc.1"
[[projects]]
digest = "1:86b11d1e4dd05cd44d14b1e30b0497f98a37f696e8740ae88383de56d766cb34"
@ -1834,9 +1833,10 @@
"github.com/cortexproject/cortex/pkg/ingester/client",
"github.com/cortexproject/cortex/pkg/ingester/index",
"github.com/cortexproject/cortex/pkg/ring",
"github.com/cortexproject/cortex/pkg/ring/kv",
"github.com/cortexproject/cortex/pkg/ring/kv/codec",
"github.com/cortexproject/cortex/pkg/util",
"github.com/cortexproject/cortex/pkg/util/flagext",
"github.com/cortexproject/cortex/pkg/util/grpcclient",
"github.com/cortexproject/cortex/pkg/util/spanlogger",
"github.com/cortexproject/cortex/pkg/util/validation",
"github.com/davecgh/go-spew/spew",
@ -1899,6 +1899,7 @@
"github.com/weaveworks/common/tracing",
"github.com/weaveworks/common/user",
"golang.org/x/net/context",
"golang.org/x/time/rate",
"google.golang.org/grpc",
"google.golang.org/grpc/codes",
"google.golang.org/grpc/health/grpc_health_v1",

@ -16,7 +16,8 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/util/validation"
)
func init() {

@ -4,27 +4,34 @@ import (
"context"
"flag"
"net/http"
"sync"
"sync/atomic"
"time"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
const metricName = "logs"
const (
metricName = "logs"
bytesInMB = 1048576
)
var readinessProbeSuccess = []byte("Ready")
var (
@ -55,10 +62,13 @@ var (
type Config struct {
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error)
LimiterReloadPeriod time.Duration `yaml:"limiter_reload_period"`
}
// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
}
// Distributor coordinates replicates and distribution of log streams.
@ -68,6 +78,10 @@ type Distributor struct {
ring ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool
ingestLimitersMtx sync.RWMutex
ingestLimiters map[string]*rate.Limiter
quit chan struct{}
}
// New a distributor creates.
@ -79,13 +93,44 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *val
}
}
return &Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
}, nil
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
ingestLimiters: map[string]*rate.Limiter{},
quit: make(chan struct{}),
}
go d.loop()
return &d, nil
}
func (d *Distributor) loop() {
if d.cfg.LimiterReloadPeriod == 0 {
return
}
ticker := time.NewTicker(d.cfg.LimiterReloadPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.ingestLimitersMtx.Lock()
d.ingestLimiters = make(map[string]*rate.Limiter, len(d.ingestLimiters))
d.ingestLimitersMtx.Unlock()
case <-d.quit:
return
}
}
}
func (d *Distributor) Stop() {
close(d.quit)
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
@ -145,6 +190,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
var validationErr error
validatedSamplesSize := 0
for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
validationErr = err
@ -153,13 +200,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
if err := cortex_validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
validationErr = err
continue
}
entries = append(entries, entry)
validatedSamplesSize += len(entry.Line)
}
if len(entries) == 0 {
@ -176,6 +224,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}
limiter := d.getOrCreateIngestLimiter(userID)
if !limiter.AllowN(time.Now(), validatedSamplesSize) {
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), lineCount)
}
replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
@ -226,7 +282,7 @@ func (d *Distributor) validateLabels(userID, labels string) error {
}
// everything in `ValidateLabels` returns `httpgrpc.Errorf` errors, no sugaring needed
return validation.ValidateLabels(d.overrides, userID, ls)
return cortex_validation.ValidateLabels(d.overrides, userID, ls)
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
@ -287,3 +343,21 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}
func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
d.ingestLimitersMtx.RLock()
limiter, ok := d.ingestLimiters[userID]
d.ingestLimitersMtx.RUnlock()
if ok {
return limiter
}
limiter = rate.NewLimiter(rate.Limit(int64(d.overrides.IngestionRate(userID)*bytesInMB)), int(d.overrides.IngestionBurstSize(userID)*bytesInMB))
d.ingestLimitersMtx.Lock()
d.ingestLimiters[userID] = limiter
d.ingestLimitersMtx.Unlock()
return limiter
}

@ -3,25 +3,63 @@ package distributor
import (
"context"
"fmt"
"net/http"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)
const (
numIngesters = 5
ingestionRateLimit = 0.000096 // 100 Bytes/s limit
)
const numIngesters = 5
var (
success = &logproto.PushResponse{}
ctx = user.InjectOrgID(context.Background(), "test")
)
func TestDistributor(t *testing.T) {
for i, tc := range []struct {
samples int
expectedResponse *logproto.PushResponse
expectedError error
}{
{
samples: 10,
expectedResponse: success,
},
{
samples: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100) exceeded while adding 100 lines"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.samples), func(t *testing.T) {
d := prepare(t)
request := makeWriteRequest(tc.samples)
response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)
})
}
}
func prepare(t *testing.T) *Distributor {
var (
distributorConfig Config
defaultLimits validation.Limits
@ -29,6 +67,8 @@ func TestDistributor(t *testing.T) {
)
flagext.DefaultValues(&distributorConfig, &defaultLimits, &clientConfig)
defaultLimits.EnforceMetricName = false
defaultLimits.IngestionRate = ingestionRateLimit
defaultLimits.IngestionBurstSize = ingestionRateLimit
limits, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)
@ -54,6 +94,10 @@ func TestDistributor(t *testing.T) {
d, err := New(distributorConfig, clientConfig, r, limits)
require.NoError(t, err)
return d
}
func makeWriteRequest(samples int) *logproto.PushRequest {
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
@ -61,15 +105,14 @@ func TestDistributor(t *testing.T) {
},
},
}
for i := 0; i < 10; i++ {
for i := 0; i < samples; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}
_, err = d.Push(user.InjectOrgID(context.Background(), "test"), &req)
require.NoError(t, err)
return &req
}
type mockIngester struct {

@ -7,16 +7,18 @@ import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
@ -65,7 +67,10 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
chunks: map[string][]chunk.Chunk{},
}
ing, err := New(cfg, client.Config{}, store)
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)
ing, err := New(cfg, client.Config{}, store, limits)
require.NoError(t, err)
return store, ing

@ -17,8 +17,10 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)
// ErrReadOnly is returned when the ingester is shutting down and a push was
@ -83,6 +85,8 @@ type Ingester struct {
// pick a queue.
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup
limits *validation.Overrides
}
// ChunkStore is the interface we need to store chunks.
@ -91,7 +95,7 @@ type ChunkStore interface {
}
// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store ChunkStore) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
@ -104,6 +108,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore) (*Ingester, e
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
limits: limits,
}
i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
@ -181,7 +186,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.cfg.BlockSize)
inst = newInstance(instanceID, i.cfg.BlockSize, i.limits)
i.instances[instanceID] = inst
}
return inst

@ -2,27 +2,35 @@ package ingester
import (
"fmt"
"net/http"
"sync"
"testing"
"time"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)
func TestIngester(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store)
i, err := New(ingesterConfig, client.Config{}, store, limits)
require.NoError(t, err)
defer i.Shutdown()
@ -93,6 +101,48 @@ func TestIngester(t *testing.T) {
require.Equal(t, `{bar="baz2", foo="bar"}`, result.resps[0].Streams[0].Labels)
}
func TestIngesterStreamLimitExceeded(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
defaultLimits.MaxStreamsPerUser = 1
overrides, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, overrides)
require.NoError(t, err)
defer i.Shutdown()
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
},
}
for i := 0; i < 10; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)
req.Streams[0].Labels = `{foo="bar",bar="baz2"}`
_, err = i.Push(ctx, &req)
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests {
t.Fatalf("expected error about exceeding metrics per user, got %v", err)
}
}
type mockStore struct {
mtx sync.Mutex
chunks map[string][]chunk.Chunk
@ -125,3 +175,9 @@ func (m *mockQuerierServer) Send(resp *logproto.QueryResponse) error {
func (m *mockQuerierServer) Context() context.Context {
return m.ctx
}
func defaultLimitsTestConfig() validation.Limits {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
return limits
}

@ -2,6 +2,7 @@ package ingester
import (
"context"
"net/http"
"sync"
"github.com/pkg/errors"
@ -9,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
@ -19,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
const queryBatchSize = 128
@ -54,9 +57,11 @@ type instance struct {
blockSize int
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
limits *validation.Overrides
}
func newInstance(instanceID string, blockSize int) *instance {
func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance {
return &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
@ -67,6 +72,7 @@ func newInstance(instanceID string, blockSize int) *instance {
blockSize: blockSize,
tailers: map[uint32]*tailer{},
limits: limits,
}
}
@ -101,14 +107,10 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
continue
}
fp := client.FastFingerprint(labels)
stream, ok := i.streams[fp]
if !ok {
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
stream, err := i.getOrCreateStream(labels)
if err != nil {
appendErr = err
continue
}
if err := stream.Push(ctx, s.Entries); err != nil {
@ -120,6 +122,26 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}
func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) {
fp := client.FastFingerprint(labels)
stream, ok := i.streams[fp]
if ok {
return stream, nil
}
if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
}
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
return stream, nil
}
func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {

@ -4,22 +4,22 @@ import (
"flag"
"fmt"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/validation"
)
// Config is the root config for Loki.

@ -7,22 +7,22 @@ import (
"strings"
"time"
"github.com/go-kit/kit/log/level"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/validation"
)
const maxChunkAgeForTableManager = 12 * time.Hour
@ -133,8 +133,13 @@ func (t *Loki) initDistributor() (err error) {
return
}
func (t *Loki) stopDistributor() (err error) {
t.distributor.Stop()
return nil
}
func (t *Loki) initQuerier() (err error) {
t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store)
t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
return
}
@ -159,7 +164,7 @@ func (t *Loki) initQuerier() (err error) {
func (t *Loki) initIngester() (err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store)
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides)
if err != nil {
return
}
@ -319,6 +324,7 @@ var modules = map[moduleName]module{
Distributor: {
deps: []moduleName{Ring, Server, Overrides},
init: (*Loki).initDistributor,
stop: (*Loki).stopDistributor,
},
Store: {

@ -6,18 +6,23 @@ import (
"net/http"
"time"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/validation"
)
const (
@ -49,26 +54,28 @@ type Querier struct {
pool *cortex_client.Pool
store storage.Store
engine *logql.Engine
limits *validation.Overrides
}
// New makes a new Querier.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store) (*Querier, error) {
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
}
return newQuerier(cfg, clientCfg, factory, ring, store)
return newQuerier(cfg, clientCfg, factory, ring, store, limits)
}
// newQuerier creates a new Querier and allows to pass a custom ingester client factory
// used for testing purposes
func newQuerier(cfg Config, clientCfg client.Config, clientFactory cortex_client.Factory, ring ring.ReadRing, store storage.Store) (*Querier, error) {
func newQuerier(cfg Config, clientCfg client.Config, clientFactory cortex_client.Factory, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
return &Querier{
cfg: cfg,
ring: ring,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
store: store,
engine: logql.NewEngine(cfg.Engine),
limits: limits,
}, nil
}
@ -144,6 +151,10 @@ func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(l
// Select Implements logql.Querier which select logs via matchers and regex filters.
func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.EntryIterator, error) {
err := q.validateQueryRequest(ctx, params.QueryRequest)
if err != nil {
return nil, err
}
ingesterIterators, err := q.queryIngesters(ctx, params)
if err != nil {
@ -256,6 +267,21 @@ func mergePair(s1, s2 []string) []string {
// Tail keeps getting matching logs from all ingesters for given query
func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
histReq := logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: req.Query,
Start: req.Start,
End: time.Now(),
Limit: req.Limit,
Direction: logproto.BACKWARD,
},
}
err := q.validateQueryRequest(ctx, histReq.QueryRequest)
if err != nil {
return nil, err
}
// Enforce the query timeout except when tailing, otherwise the tailing
// will be terminated once the query timeout is reached
tailCtx := ctx
@ -274,15 +300,6 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
tailClients[clients[i].addr] = clients[i].response.(logproto.Querier_TailClient)
}
histReq := logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: req.Query,
Start: req.Start,
End: time.Now(),
Limit: req.Limit,
Direction: logproto.BACKWARD,
},
}
histIterators, err := q.Select(queryCtx, histReq)
if err != nil {
return nil, err
@ -354,3 +371,36 @@ func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.T
return reconnectClientsMap, nil
}
func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}
matchers, err := logql.ParseMatchers(req.Selector)
if err != nil {
return err
}
maxStreamMatchersPerQuery := q.limits.MaxStreamsMatchersPerQuery(userID)
if len(matchers) > maxStreamMatchersPerQuery {
return httpgrpc.Errorf(http.StatusBadRequest,
"max streams matchers per query exceeded, matchers-count > limit (%d > %d)", len(matchers), maxStreamMatchersPerQuery)
}
return q.validateQueryTimeRange(userID, &req.Start, &req.End)
}
func (q *Querier) validateQueryTimeRange(userID string, from *time.Time, through *time.Time) error {
if (*through).Before(*from) {
return httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", *through, *from)
}
maxQueryLength := q.limits.MaxQueryLength(userID)
if maxQueryLength > 0 && (*through).Sub(*from) > maxQueryLength {
return httpgrpc.Errorf(http.StatusBadRequest, cortex_validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength)
}
return nil
}

@ -2,16 +2,24 @@ package querier
import (
"context"
"net/http"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)
const (
@ -36,12 +44,15 @@ func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) {
store := newStoreMock()
store.On("LabelValuesForMetricName", mock.Anything, model.TimeFromUnixNano(startTime.UnixNano()), model.TimeFromUnixNano(endTime.UnixNano()), "logs", "test").Return([]string{"foo", "bar"}, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)
q, err := newQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
store)
store, limits)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
@ -84,12 +95,15 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil)
ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)
q, err := newQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
store)
store, limits)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
@ -182,12 +196,15 @@ func TestQuerier_tailDisconnectedIngesters(t *testing.T) {
ingesterClient := newQuerierClientMock()
ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(newTailClientMock(), nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)
q, err := newQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
newReadRingMock(testData.ringIngesters),
newStoreMock())
newStoreMock(), limits)
require.NoError(t, err)
actualClients, err := q.tailDisconnectedIngesters(context.Background(), &req, testData.connectedIngestersAddr)
@ -224,3 +241,56 @@ func mockLabelResponse(values []string) *logproto.LabelResponse {
Values: values,
}
}
func defaultLimitsTestConfig() validation.Limits {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
return limits
}
func TestQuerier_validateQueryRequest(t *testing.T) {
request := logproto.QueryRequest{
Selector: "{type=\"test\", fail=\"yes\"}",
Limit: 10,
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Direction: logproto.FORWARD,
}
store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)
queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, &request, mock.Anything).Return(queryClient, nil)
defaultLimits := defaultLimitsTestConfig()
defaultLimits.MaxStreamsMatchersPerQuery = 1
defaultLimits.MaxQueryLength = 2 * time.Minute
limits, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)
q, err := newQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
store, limits)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
_, err = q.Select(ctx, logql.SelectParams{QueryRequest: &request})
require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "max streams matchers per query exceeded, matchers-count > limit (2 > 1)"), err)
request.Selector = "{type=\"test\"}"
_, err = q.Select(ctx, logql.SelectParams{QueryRequest: &request})
require.NoError(t, err)
request.Start = request.End.Add(-3 * time.Minute)
_, err = q.Select(ctx, logql.SelectParams{QueryRequest: &request})
require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, length > limit (3m0s > 2m0s)"), err)
}

@ -9,18 +9,20 @@ import (
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
lstore "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/util/validation"
)
var (

@ -4,15 +4,16 @@ import (
"context"
"flag"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
// Config is the loki storage configuration
@ -39,7 +40,7 @@ type store struct {
}
// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (Store, error) {
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits) (Store, error) {
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits)
if err != nil {
return nil, err

@ -9,15 +9,17 @@ import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/util/validation"
)
var (

@ -0,0 +1,226 @@
package validation
import (
"flag"
"os"
"time"
"github.com/cortexproject/cortex/pkg/util/validation"
"gopkg.in/yaml.v2"
)
// Limits describe all the limits for users; can be used to describe global default
// limits via flags, or per-user limits via yaml config.
type Limits struct {
// Distributor enforced limits.
IngestionRate float64 `yaml:"ingestion_rate_mb"`
IngestionBurstSize float64 `yaml:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples"`
RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"`
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name"`
// Ingester enforced limits.
MaxStreamsPerUser int `yaml:"max_streams_per_user"`
// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_chunks_per_query"`
MaxQueryLength time.Duration `yaml:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism"`
CardinalityLimit int `yaml:"cardinality_limit"`
MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query"`
// Config for overrides, convenient if it goes here.
PerTenantOverrideConfig string `yaml:"per_tenant_override_config"`
PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit-mb", 4, "Per-user ingestion rate limit in sample size per second. Units in MB.")
f.Float64Var(&l.IngestionBurstSize, "distributor.ingestion-burst-size-mb", 6, "Per-user allowed ingestion burst size (in sample size). Units in MB. Warning, very high limits will be reset every -distributor.limiter-reload-period.")
f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names")
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Reject old samples.")
f.DurationVar(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", 14*24*time.Hour, "Maximum accepted sample age before rejecting.")
f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.")
f.IntVar(&l.MaxStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user.")
f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.")
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query")
f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with this to reload the overrides.")
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
// We want to set c to the defaults and then overwrite it with the input.
// To make unmarshal fill the plain data struct rather than calling UnmarshalYAML
// again, we have to hide it using a type indirection. See prometheus/config.
// During startup we wont have a default value so we don't want to overwrite them
if defaultLimits != nil {
*l = *defaultLimits
}
type plain Limits
return unmarshal((*plain)(l))
}
// When we load YAML from disk, we want the various per-customer limits
// to default to any values specified on the command line, not default
// command line values. This global contains those values. I (Tom) cannot
// find a nicer way I'm afraid.
var defaultLimits *Limits
// Overrides periodically fetch a set of per-user overrides, and provides convenience
// functions for fetching the correct value.
type Overrides struct {
overridesManager *validation.OverridesManager
}
// NewOverrides makes a new Overrides.
// We store the supplied limits in a global variable to ensure per-tenant limits
// are defaulted to those values. As such, the last call to NewOverrides will
// become the new global defaults.
func NewOverrides(defaults Limits) (*Overrides, error) {
defaultLimits = &defaults
overridesManagerConfig := validation.OverridesManagerConfig{
OverridesReloadPeriod: defaults.PerTenantOverridePeriod,
OverridesLoadPath: defaults.PerTenantOverrideConfig,
OverridesLoader: loadOverrides,
Defaults: &defaults,
}
overridesManager, err := validation.NewOverridesManager(overridesManagerConfig)
if err != nil {
return nil, err
}
return &Overrides{
overridesManager: overridesManager,
}, nil
}
// Stop background reloading of overrides.
func (o *Overrides) Stop() {
o.overridesManager.Stop()
}
// IngestionRate returns the limit on ingester rate (samples per second).
func (o *Overrides) IngestionRate(userID string) float64 {
return o.overridesManager.GetLimits(userID).(*Limits).IngestionRate
}
// IngestionBurstSize returns the burst size for ingestion rate.
func (o *Overrides) IngestionBurstSize(userID string) float64 {
return o.overridesManager.GetLimits(userID).(*Limits).IngestionBurstSize
}
// MaxLabelNameLength returns maximum length a label name can be.
func (o *Overrides) MaxLabelNameLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNameLength
}
// MaxLabelValueLength returns maximum length a label value can be. This also is
// the maximum length of a metric name.
func (o *Overrides) MaxLabelValueLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelValueLength
}
// MaxLabelNamesPerSeries returns maximum number of label/value pairs timeseries.
func (o *Overrides) MaxLabelNamesPerSeries(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNamesPerSeries
}
// RejectOldSamples returns true when we should reject samples older than certain
// age.
func (o *Overrides) RejectOldSamples(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamples
}
// RejectOldSamplesMaxAge returns the age at which samples should be rejected.
func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamplesMaxAge
}
// CreationGracePeriod is misnamed, and actually returns how far into the future
// we should accept samples.
func (o *Overrides) CreationGracePeriod(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).CreationGracePeriod
}
// MaxStreamsPerUser returns the maximum number of streams a user is allowed to store.
func (o *Overrides) MaxStreamsPerUser(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxStreamsPerUser
}
// MaxChunksPerQuery returns the maximum number of chunks allowed per query.
func (o *Overrides) MaxChunksPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxChunksPerQuery
}
// MaxQueryLength returns the limit of the length (in time) of a query.
func (o *Overrides) MaxQueryLength(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryLength
}
// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryParallelism
}
// EnforceMetricName whether to enforce the presence of a metric name.
func (o *Overrides) EnforceMetricName(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).EnforceMetricName
}
// CardinalityLimit whether to enforce the presence of a metric name.
func (o *Overrides) CardinalityLimit(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).CardinalityLimit
}
// MaxStreamsMatchersPerQuery returns the limit to number of streams matchers per query.
func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxStreamsMatchersPerQuery
}
// Loads overrides and returns the limits as an interface to store them in OverridesManager.
// We need to implement it here since OverridesManager must store type Limits in an interface but
// it doesn't know its definition to initialize it.
// We could have used yamlv3.Node for this but there is no way to enforce strict decoding due to a bug in it
// TODO: Use yamlv3.Node to move this to OverridesManager after https://github.com/go-yaml/yaml/issues/460 is fixed
func loadOverrides(filename string) (map[string]interface{}, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
var overrides struct {
Overrides map[string]*Limits `yaml:"overrides"`
}
decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
if err := decoder.Decode(&overrides); err != nil {
return nil, err
}
overridesAsInterface := map[string]interface{}{}
for userID := range overrides.Overrides {
overridesAsInterface[userID] = overrides.Overrides[userID]
}
return overridesAsInterface, nil
}

@ -0,0 +1,24 @@
package validation
import "github.com/prometheus/client_golang/prometheus"
const (
discardReasonLabel = "reason"
// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
RateLimited = "rate_limited"
)
// DiscardedSamples is a metric of the number of discarded samples, by reason.
var DiscardedSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "loki_discarded_samples_total",
Help: "The total number of samples that were discarded.",
},
[]string{discardReasonLabel, "user"},
)
func init() {
prometheus.MustRegister(DiscardedSamples)
}

@ -78,11 +78,11 @@ type store struct {
index IndexClient
chunks ObjectClient
schema Schema
limits *validation.Overrides
limits StoreLimits
*Fetcher
}
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) {
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits StoreLimits) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err

@ -3,13 +3,18 @@ package chunk
import (
"context"
"sort"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/cortexproject/cortex/pkg/util/validation"
)
// StoreLimits helps get Limits specific to Queries for Stores
type StoreLimits interface {
MaxChunksPerQuery(userID string) int
MaxQueryLength(userID string) time.Duration
}
// Store for chunks.
type Store interface {
Put(ctx context.Context, chunks []Chunk) error
@ -45,7 +50,7 @@ func NewCompositeStore() CompositeStore {
}
// AddPeriod adds the configuration for a period of time to the CompositeStore
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks ObjectClient, limits *validation.Overrides) error {
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks ObjectClient, limits StoreLimits) error {
schema := cfg.CreateSchema()
var store Store
var err error

@ -16,7 +16,6 @@ import (
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)
// CardinalityExceededError is returned when the user reads a row that
@ -67,7 +66,7 @@ type seriesStore struct {
writeDedupeCache cache.Cache
}
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) {
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits StoreLimits) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err

@ -16,7 +16,6 @@ import (
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)
var (
@ -46,10 +45,10 @@ type cachingIndexClient struct {
chunk.IndexClient
cache cache.Cache
validity time.Duration
limits *validation.Overrides
limits StoreLimits
}
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits *validation.Overrides) chunk.IndexClient {
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits) chunk.IndexClient {
if c == nil {
return client
}

@ -14,11 +14,17 @@ import (
"github.com/cortexproject/cortex/pkg/chunk/gcp"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
)
// StoreLimits helps get Limits specific to Queries for Stores
type StoreLimits interface {
CardinalityLimit(userID string) int
MaxChunksPerQuery(userID string) int
MaxQueryLength(userID string) time.Duration
}
// Config chooses which storage client to use.
type Config struct {
AWSStorageConfig aws.StorageConfig `yaml:"aws"`
@ -47,7 +53,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}
// NewStore makes the storage clients based on the configuration.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (chunk.Store, error) {
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig)
if err != nil {
return nil, err

@ -40,8 +40,12 @@ const tpl = `
</tr>
</thead>
<tbody>
{{ range .Ingesters }}
{{ range $i, $ing := .Ingesters }}
{{ if mod $i 2 }}
<tr>
{{ else }}
<tr bgcolor="#BEBEBE">
{{ end }}
<td>{{ .ID }}</td>
<td>{{ .State }}</td>
<td>{{ .Address }}</td>
@ -61,7 +65,9 @@ const tpl = `
var tmpl *template.Template
func init() {
tmpl = template.Must(template.New("webpage").Parse(tpl))
t := template.New("webpage")
t.Funcs(template.FuncMap{"mod": func(i, j int) bool { return i%j == 0 }})
tmpl = template.Must(t.Parse(tpl))
}
func (r *Ring) forget(ctx context.Context, id string) error {

@ -24,25 +24,28 @@ func (cfg *BackoffConfig) RegisterFlags(prefix string, f *flag.FlagSet) {
// Backoff implements exponential backoff with randomized wait times
type Backoff struct {
cfg BackoffConfig
ctx context.Context
numRetries int
duration time.Duration
cfg BackoffConfig
ctx context.Context
numRetries int
nextDelayMin time.Duration
nextDelayMax time.Duration
}
// NewBackoff creates a Backoff object. Pass a Context that can also terminate the operation.
func NewBackoff(ctx context.Context, cfg BackoffConfig) *Backoff {
return &Backoff{
cfg: cfg,
ctx: ctx,
duration: cfg.MinBackoff,
cfg: cfg,
ctx: ctx,
nextDelayMin: cfg.MinBackoff,
nextDelayMax: doubleDuration(cfg.MinBackoff, cfg.MaxBackoff),
}
}
// Reset the Backoff back to its initial condition
func (b *Backoff) Reset() {
b.numRetries = 0
b.duration = b.cfg.MinBackoff
b.nextDelayMin = b.cfg.MinBackoff
b.nextDelayMax = doubleDuration(b.cfg.MinBackoff, b.cfg.MaxBackoff)
}
// Ongoing returns true if caller should keep going
@ -70,18 +73,45 @@ func (b *Backoff) NumRetries() int {
// Wait sleeps for the backoff time then increases the retry count and backoff time
// Returns immediately if Context is terminated
func (b *Backoff) Wait() {
b.numRetries++
// Based on the "Full Jitter" approach from https://www.awsarchitectureblog.com/2015/03/backoff.html
// sleep = random_between(0, min(cap, base * 2 ** attempt))
// Increase the number of retries and get the next delay
sleepTime := b.nextDelay()
if b.Ongoing() {
sleepTime := time.Duration(rand.Int63n(int64(b.duration)))
select {
case <-b.ctx.Done():
case <-time.After(sleepTime):
}
}
b.duration = b.duration * 2
if b.duration > b.cfg.MaxBackoff {
b.duration = b.cfg.MaxBackoff
}
func (b *Backoff) nextDelay() time.Duration {
b.numRetries++
// Handle the edge case the min and max have the same value
// (or due to some misconfig max is < min)
if b.nextDelayMin >= b.nextDelayMax {
return b.nextDelayMin
}
// Add a jitter within the next exponential backoff range
sleepTime := b.nextDelayMin + time.Duration(rand.Int63n(int64(b.nextDelayMax-b.nextDelayMin)))
// Apply the exponential backoff to calculate the next jitter
// range, unless we've already reached the max
if b.nextDelayMax < b.cfg.MaxBackoff {
b.nextDelayMin = doubleDuration(b.nextDelayMin, b.cfg.MaxBackoff)
b.nextDelayMax = doubleDuration(b.nextDelayMax, b.cfg.MaxBackoff)
}
return sleepTime
}
func doubleDuration(value time.Duration, max time.Duration) time.Duration {
value = value * 2
if value <= max {
return value
}
return max
}

@ -30,6 +30,7 @@ type Limits struct {
MaxSamplesPerQuery int `yaml:"max_samples_per_query"`
MaxSeriesPerUser int `yaml:"max_series_per_user"`
MaxSeriesPerMetric int `yaml:"max_series_per_metric"`
MinChunkLength int `yaml:"min_chunk_length"`
// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_chunks_per_query"`
@ -61,6 +62,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return.")
f.IntVar(&l.MaxSeriesPerUser, "ingester.max-series-per-user", 5000000, "Maximum number of active series per user.")
f.IntVar(&l.MaxSeriesPerMetric, "ingester.max-series-per-metric", 50000, "Maximum number of active series per metric name.")
f.IntVar(&l.MinChunkLength, "ingester.min-chunk-length", 0, "Minimum number of samples in an idle chunk to flush it to the store. Use with care, if chunks are less than this size they will be discarded.")
f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
@ -229,6 +231,11 @@ func (o *Overrides) CardinalityLimit(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).CardinalityLimit
}
// MinChunkLength returns the minimum size of chunk that will be saved by ingesters
func (o *Overrides) MinChunkLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MinChunkLength
}
// Loads overrides and returns the limits as an interface to store them in OverridesManager.
// We need to implement it here since OverridesManager must store type Limits in an interface but
// it doesn't know its definition to initialize it.

@ -1,85 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package procfs
import (
"fmt"
"io/ioutil"
"net"
"strings"
)
// ARPEntry contains a single row of the columnar data represented in
// /proc/net/arp.
type ARPEntry struct {
// IP address
IPAddr net.IP
// MAC address
HWAddr net.HardwareAddr
// Name of the device
Device string
}
// GatherARPEntries retrieves all the ARP entries, parse the relevant columns,
// and then return a slice of ARPEntry's.
func (fs FS) GatherARPEntries() ([]ARPEntry, error) {
data, err := ioutil.ReadFile(fs.proc.Path("net/arp"))
if err != nil {
return nil, fmt.Errorf("error reading arp %s: %s", fs.proc.Path("net/arp"), err)
}
return parseARPEntries(data)
}
func parseARPEntries(data []byte) ([]ARPEntry, error) {
lines := strings.Split(string(data), "\n")
entries := make([]ARPEntry, 0)
var err error
const (
expectedDataWidth = 6
expectedHeaderWidth = 9
)
for _, line := range lines {
columns := strings.Fields(line)
width := len(columns)
if width == expectedHeaderWidth || width == 0 {
continue
} else if width == expectedDataWidth {
entry, err := parseARPEntry(columns)
if err != nil {
return []ARPEntry{}, fmt.Errorf("failed to parse ARP entry: %s", err)
}
entries = append(entries, entry)
} else {
return []ARPEntry{}, fmt.Errorf("%d columns were detected, but %d were expected", width, expectedDataWidth)
}
}
return entries, err
}
func parseARPEntry(columns []string) (ARPEntry, error) {
ip := net.ParseIP(columns[0])
mac := net.HardwareAddr(columns[3])
entry := ARPEntry{
IPAddr: ip,
HWAddr: mac,
Device: columns[5],
}
return entry, nil
}

@ -1,131 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package procfs
import (
"bytes"
"fmt"
"io/ioutil"
"strconv"
"strings"
"github.com/prometheus/procfs/internal/util"
)
// Crypto holds info parsed from /proc/crypto.
type Crypto struct {
Alignmask *uint64
Async bool
Blocksize *uint64
Chunksize *uint64
Ctxsize *uint64
Digestsize *uint64
Driver string
Geniv string
Internal string
Ivsize *uint64
Maxauthsize *uint64
MaxKeysize *uint64
MinKeysize *uint64
Module string
Name string
Priority *int64
Refcnt *int64
Seedsize *uint64
Selftest string
Type string
Walksize *uint64
}
// Crypto parses an crypto-file (/proc/crypto) and returns a slice of
// structs containing the relevant info. More information available here:
// https://kernel.readthedocs.io/en/sphinx-samples/crypto-API.html
func (fs FS) Crypto() ([]Crypto, error) {
data, err := ioutil.ReadFile(fs.proc.Path("crypto"))
if err != nil {
return nil, fmt.Errorf("error parsing crypto %s: %s", fs.proc.Path("crypto"), err)
}
crypto, err := parseCrypto(data)
if err != nil {
return nil, fmt.Errorf("error parsing crypto %s: %s", fs.proc.Path("crypto"), err)
}
return crypto, nil
}
func parseCrypto(cryptoData []byte) ([]Crypto, error) {
crypto := []Crypto{}
cryptoBlocks := bytes.Split(cryptoData, []byte("\n\n"))
for _, block := range cryptoBlocks {
var newCryptoElem Crypto
lines := strings.Split(string(block), "\n")
for _, line := range lines {
if strings.TrimSpace(line) == "" || line[0] == ' ' {
continue
}
fields := strings.Split(line, ":")
key := strings.TrimSpace(fields[0])
value := strings.TrimSpace(fields[1])
vp := util.NewValueParser(value)
switch strings.TrimSpace(key) {
case "async":
b, err := strconv.ParseBool(value)
if err == nil {
newCryptoElem.Async = b
}
case "blocksize":
newCryptoElem.Blocksize = vp.PUInt64()
case "chunksize":
newCryptoElem.Chunksize = vp.PUInt64()
case "digestsize":
newCryptoElem.Digestsize = vp.PUInt64()
case "driver":
newCryptoElem.Driver = value
case "geniv":
newCryptoElem.Geniv = value
case "internal":
newCryptoElem.Internal = value
case "ivsize":
newCryptoElem.Ivsize = vp.PUInt64()
case "maxauthsize":
newCryptoElem.Maxauthsize = vp.PUInt64()
case "max keysize":
newCryptoElem.MaxKeysize = vp.PUInt64()
case "min keysize":
newCryptoElem.MinKeysize = vp.PUInt64()
case "module":
newCryptoElem.Module = value
case "name":
newCryptoElem.Name = value
case "priority":
newCryptoElem.Priority = vp.PInt64()
case "refcnt":
newCryptoElem.Refcnt = vp.PInt64()
case "seedsize":
newCryptoElem.Seedsize = vp.PUInt64()
case "selftest":
newCryptoElem.Selftest = value
case "type":
newCryptoElem.Type = value
case "walksize":
newCryptoElem.Walksize = vp.PUInt64()
}
}
crypto = append(crypto, newCryptoElem)
}
return crypto, nil
}

File diff suppressed because it is too large Load Diff

@ -1,88 +0,0 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"io/ioutil"
"strconv"
"strings"
)
// ParseUint32s parses a slice of strings into a slice of uint32s.
func ParseUint32s(ss []string) ([]uint32, error) {
us := make([]uint32, 0, len(ss))
for _, s := range ss {
u, err := strconv.ParseUint(s, 10, 32)
if err != nil {
return nil, err
}
us = append(us, uint32(u))
}
return us, nil
}
// ParseUint64s parses a slice of strings into a slice of uint64s.
func ParseUint64s(ss []string) ([]uint64, error) {
us := make([]uint64, 0, len(ss))
for _, s := range ss {
u, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return nil, err
}
us = append(us, u)
}
return us, nil
}
// ParsePInt64s parses a slice of strings into a slice of int64 pointers.
func ParsePInt64s(ss []string) ([]*int64, error) {
us := make([]*int64, 0, len(ss))
for _, s := range ss {
u, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return nil, err
}
us = append(us, &u)
}
return us, nil
}
// ReadUintFromFile reads a file and attempts to parse a uint64 from it.
func ReadUintFromFile(path string) (uint64, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return 0, err
}
return strconv.ParseUint(strings.TrimSpace(string(data)), 10, 64)
}
// ParseBool parses a string into a boolean pointer.
func ParseBool(b string) *bool {
var truth bool
switch b {
case "enabled":
truth = true
case "disabled":
truth = false
default:
return nil
}
return &truth
}

@ -1,45 +0,0 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build linux,!appengine
package util
import (
"bytes"
"os"
"syscall"
)
// SysReadFile is a simplified ioutil.ReadFile that invokes syscall.Read directly.
// https://github.com/prometheus/node_exporter/pull/728/files
func SysReadFile(file string) (string, error) {
f, err := os.Open(file)
if err != nil {
return "", err
}
defer f.Close()
// On some machines, hwmon drivers are broken and return EAGAIN. This causes
// Go's ioutil.ReadFile implementation to poll forever.
//
// Since we either want to read data or bail immediately, do the simplest
// possible read using syscall directly.
b := make([]byte, 128)
n, err := syscall.Read(int(f.Fd()), b)
if err != nil {
return "", err
}
return string(bytes.TrimSpace(b[:n])), nil
}

@ -1,26 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build linux,appengine !linux
package util
import (
"fmt"
)
// SysReadFile is here implemented as a noop for builds that do not support
// the read syscall. For example Windows, or Linux on Google App Engine.
func SysReadFile(file string) (string, error) {
return "", fmt.Errorf("not supported on this platform")
}

@ -1,77 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"strconv"
)
// TODO(mdlayher): util packages are an anti-pattern and this should be moved
// somewhere else that is more focused in the future.
// A ValueParser enables parsing a single string into a variety of data types
// in a concise and safe way. The Err method must be invoked after invoking
// any other methods to ensure a value was successfully parsed.
type ValueParser struct {
v string
err error
}
// NewValueParser creates a ValueParser using the input string.
func NewValueParser(v string) *ValueParser {
return &ValueParser{v: v}
}
// PInt64 interprets the underlying value as an int64 and returns a pointer to
// that value.
func (vp *ValueParser) PInt64() *int64 {
if vp.err != nil {
return nil
}
// A base value of zero makes ParseInt infer the correct base using the
// string's prefix, if any.
const base = 0
v, err := strconv.ParseInt(vp.v, base, 64)
if err != nil {
vp.err = err
return nil
}
return &v
}
// PUInt64 interprets the underlying value as an uint64 and returns a pointer to
// that value.
func (vp *ValueParser) PUInt64() *uint64 {
if vp.err != nil {
return nil
}
// A base value of zero makes ParseInt infer the correct base using the
// string's prefix, if any.
const base = 0
v, err := strconv.ParseUint(vp.v, base, 64)
if err != nil {
vp.err = err
return nil
}
return &v
}
// Err returns the last error, if any, encountered by the ValueParser.
func (vp *ValueParser) Err() error {
return vp.err
}

@ -1,91 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package procfs
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
)
// For the proc file format details,
// see https://elixir.bootlin.com/linux/v4.17/source/net/core/net-procfs.c#L162
// and https://elixir.bootlin.com/linux/v4.17/source/include/linux/netdevice.h#L2810.
// SoftnetEntry contains a single row of data from /proc/net/softnet_stat
type SoftnetEntry struct {
// Number of processed packets
Processed uint
// Number of dropped packets
Dropped uint
// Number of times processing packets ran out of quota
TimeSqueezed uint
}
// GatherSoftnetStats reads /proc/net/softnet_stat, parse the relevant columns,
// and then return a slice of SoftnetEntry's.
func (fs FS) GatherSoftnetStats() ([]SoftnetEntry, error) {
data, err := ioutil.ReadFile(fs.proc.Path("net/softnet_stat"))
if err != nil {
return nil, fmt.Errorf("error reading softnet %s: %s", fs.proc.Path("net/softnet_stat"), err)
}
return parseSoftnetEntries(data)
}
func parseSoftnetEntries(data []byte) ([]SoftnetEntry, error) {
lines := strings.Split(string(data), "\n")
entries := make([]SoftnetEntry, 0)
var err error
const (
expectedColumns = 11
)
for _, line := range lines {
columns := strings.Fields(line)
width := len(columns)
if width == 0 {
continue
}
if width != expectedColumns {
return []SoftnetEntry{}, fmt.Errorf("%d columns were detected, but %d were expected", width, expectedColumns)
}
var entry SoftnetEntry
if entry, err = parseSoftnetEntry(columns); err != nil {
return []SoftnetEntry{}, err
}
entries = append(entries, entry)
}
return entries, nil
}
func parseSoftnetEntry(columns []string) (SoftnetEntry, error) {
var err error
var processed, dropped, timeSqueezed uint64
if processed, err = strconv.ParseUint(columns[0], 16, 32); err != nil {
return SoftnetEntry{}, fmt.Errorf("Unable to parse column 0: %s", err)
}
if dropped, err = strconv.ParseUint(columns[1], 16, 32); err != nil {
return SoftnetEntry{}, fmt.Errorf("Unable to parse column 1: %s", err)
}
if timeSqueezed, err = strconv.ParseUint(columns[2], 16, 32); err != nil {
return SoftnetEntry{}, fmt.Errorf("Unable to parse column 2: %s", err)
}
return SoftnetEntry{
Processed: uint(processed),
Dropped: uint(dropped),
TimeSqueezed: uint(timeSqueezed),
}, nil
}

@ -279,33 +279,3 @@ func (p Proc) fileDescriptors() ([]string, error) {
func (p Proc) path(pa ...string) string {
return p.fs.Path(append([]string{strconv.Itoa(p.PID)}, pa...)...)
}
// FileDescriptorsInfo retrieves information about all file descriptors of
// the process.
func (p Proc) FileDescriptorsInfo() (ProcFDInfos, error) {
names, err := p.fileDescriptors()
if err != nil {
return nil, err
}
var fdinfos ProcFDInfos
for _, n := range names {
fdinfo, err := p.FDInfo(n)
if err != nil {
continue
}
fdinfos = append(fdinfos, *fdinfo)
}
return fdinfos, nil
}
// Schedstat returns task scheduling information for the process.
func (p Proc) Schedstat() (ProcSchedstat, error) {
contents, err := ioutil.ReadFile(p.path("schedstat"))
if err != nil {
return ProcSchedstat{}, err
}
return parseProcSchedstat(string(contents))
}

@ -1,132 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package procfs
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
)
// Regexp variables
var (
rPos = regexp.MustCompile(`^pos:\s+(\d+)$`)
rFlags = regexp.MustCompile(`^flags:\s+(\d+)$`)
rMntID = regexp.MustCompile(`^mnt_id:\s+(\d+)$`)
rInotify = regexp.MustCompile(`^inotify`)
)
// ProcFDInfo contains represents file descriptor information.
type ProcFDInfo struct {
// File descriptor
FD string
// File offset
Pos string
// File access mode and status flags
Flags string
// Mount point ID
MntID string
// List of inotify lines (structed) in the fdinfo file (kernel 3.8+ only)
InotifyInfos []InotifyInfo
}
// FDInfo constructor. On kernels older than 3.8, InotifyInfos will always be empty.
func (p Proc) FDInfo(fd string) (*ProcFDInfo, error) {
f, err := os.Open(p.path("fdinfo", fd))
if err != nil {
return nil, err
}
defer f.Close()
fdinfo, err := ioutil.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("could not read %s: %s", f.Name(), err)
}
var text, pos, flags, mntid string
var inotify []InotifyInfo
scanner := bufio.NewScanner(strings.NewReader(string(fdinfo)))
for scanner.Scan() {
text = scanner.Text()
if rPos.MatchString(text) {
pos = rPos.FindStringSubmatch(text)[1]
} else if rFlags.MatchString(text) {
flags = rFlags.FindStringSubmatch(text)[1]
} else if rMntID.MatchString(text) {
mntid = rMntID.FindStringSubmatch(text)[1]
} else if rInotify.MatchString(text) {
newInotify, err := parseInotifyInfo(text)
if err != nil {
return nil, err
}
inotify = append(inotify, *newInotify)
}
}
i := &ProcFDInfo{
FD: fd,
Pos: pos,
Flags: flags,
MntID: mntid,
InotifyInfos: inotify,
}
return i, nil
}
// InotifyInfo represents a single inotify line in the fdinfo file.
type InotifyInfo struct {
// Watch descriptor number
WD string
// Inode number
Ino string
// Device ID
Sdev string
// Mask of events being monitored
Mask string
}
// InotifyInfo constructor. Only available on kernel 3.8+.
func parseInotifyInfo(line string) (*InotifyInfo, error) {
r := regexp.MustCompile(`^inotify\s+wd:([0-9a-f]+)\s+ino:([0-9a-f]+)\s+sdev:([0-9a-f]+)\s+mask:([0-9a-f]+)`)
m := r.FindStringSubmatch(line)
i := &InotifyInfo{
WD: m[1],
Ino: m[2],
Sdev: m[3],
Mask: m[4],
}
return i, nil
}
// ProcFDInfos represents a list of ProcFDInfo structs.
type ProcFDInfos []ProcFDInfo
func (p ProcFDInfos) Len() int { return len(p) }
func (p ProcFDInfos) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p ProcFDInfos) Less(i, j int) bool { return p[i].FD < p[j].FD }
// InotifyWatchLen returns the total number of inotify watches
func (p ProcFDInfos) InotifyWatchLen() (int, error) {
length := 0
for _, f := range p {
length += len(f.InotifyInfos)
}
return length, nil
}

@ -1,118 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package procfs
import (
"bufio"
"errors"
"os"
"regexp"
"strconv"
)
var (
cpuLineRE = regexp.MustCompile(`cpu(\d+) (\d+) (\d+) (\d+) (\d+) (\d+) (\d+) (\d+) (\d+) (\d+)`)
procLineRE = regexp.MustCompile(`(\d+) (\d+) (\d+)`)
)
// Schedstat contains scheduler statistics from /proc/schedstats
//
// See
// https://www.kernel.org/doc/Documentation/scheduler/sched-stats.txt
// for a detailed description of what these numbers mean.
//
// Note the current kernel documentation claims some of the time units are in
// jiffies when they are actually in nanoseconds since 2.6.23 with the
// introduction of CFS. A fix to the documentation is pending. See
// https://lore.kernel.org/patchwork/project/lkml/list/?series=403473
type Schedstat struct {
CPUs []*SchedstatCPU
}
// SchedstatCPU contains the values from one "cpu<N>" line
type SchedstatCPU struct {
CPUNum string
RunningNanoseconds uint64
WaitingNanoseconds uint64
RunTimeslices uint64
}
// ProcSchedstat contains the values from /proc/<pid>/schedstat
type ProcSchedstat struct {
RunningNanoseconds uint64
WaitingNanoseconds uint64
RunTimeslices uint64
}
func (fs FS) Schedstat() (*Schedstat, error) {
file, err := os.Open(fs.proc.Path("schedstat"))
if err != nil {
return nil, err
}
defer file.Close()
stats := &Schedstat{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
match := cpuLineRE.FindStringSubmatch(scanner.Text())
if match != nil {
cpu := &SchedstatCPU{}
cpu.CPUNum = match[1]
cpu.RunningNanoseconds, err = strconv.ParseUint(match[8], 10, 64)
if err != nil {
continue
}
cpu.WaitingNanoseconds, err = strconv.ParseUint(match[9], 10, 64)
if err != nil {
continue
}
cpu.RunTimeslices, err = strconv.ParseUint(match[10], 10, 64)
if err != nil {
continue
}
stats.CPUs = append(stats.CPUs, cpu)
}
}
return stats, nil
}
func parseProcSchedstat(contents string) (stats ProcSchedstat, err error) {
match := procLineRE.FindStringSubmatch(contents)
if match != nil {
stats.RunningNanoseconds, err = strconv.ParseUint(match[1], 10, 64)
if err != nil {
return
}
stats.WaitingNanoseconds, err = strconv.ParseUint(match[2], 10, 64)
if err != nil {
return
}
stats.RunTimeslices, err = strconv.ParseUint(match[3], 10, 64)
return
}
err = errors.New("could not parse schedstat")
return
}

@ -1,210 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows
package procfs
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"github.com/prometheus/procfs/internal/util"
)
// The VM interface is described at
// https://www.kernel.org/doc/Documentation/sysctl/vm.txt
// Each setting is exposed as a single file.
// Each file contains one line with a single numerical value, except lowmem_reserve_ratio which holds an array
// and numa_zonelist_order (deprecated) which is a string
type VM struct {
AdminReserveKbytes *int64 // /proc/sys/vm/admin_reserve_kbytes
BlockDump *int64 // /proc/sys/vm/block_dump
CompactUnevictableAllowed *int64 // /proc/sys/vm/compact_unevictable_allowed
DirtyBackgroundBytes *int64 // /proc/sys/vm/dirty_background_bytes
DirtyBackgroundRatio *int64 // /proc/sys/vm/dirty_background_ratio
DirtyBytes *int64 // /proc/sys/vm/dirty_bytes
DirtyExpireCentisecs *int64 // /proc/sys/vm/dirty_expire_centisecs
DirtyRatio *int64 // /proc/sys/vm/dirty_ratio
DirtytimeExpireSeconds *int64 // /proc/sys/vm/dirtytime_expire_seconds
DirtyWritebackCentisecs *int64 // /proc/sys/vm/dirty_writeback_centisecs
DropCaches *int64 // /proc/sys/vm/drop_caches
ExtfragThreshold *int64 // /proc/sys/vm/extfrag_threshold
HugetlbShmGroup *int64 // /proc/sys/vm/hugetlb_shm_group
LaptopMode *int64 // /proc/sys/vm/laptop_mode
LegacyVaLayout *int64 // /proc/sys/vm/legacy_va_layout
LowmemReserveRatio []*int64 // /proc/sys/vm/lowmem_reserve_ratio
MaxMapCount *int64 // /proc/sys/vm/max_map_count
MemoryFailureEarlyKill *int64 // /proc/sys/vm/memory_failure_early_kill
MemoryFailureRecovery *int64 // /proc/sys/vm/memory_failure_recovery
MinFreeKbytes *int64 // /proc/sys/vm/min_free_kbytes
MinSlabRatio *int64 // /proc/sys/vm/min_slab_ratio
MinUnmappedRatio *int64 // /proc/sys/vm/min_unmapped_ratio
MmapMinAddr *int64 // /proc/sys/vm/mmap_min_addr
NrHugepages *int64 // /proc/sys/vm/nr_hugepages
NrHugepagesMempolicy *int64 // /proc/sys/vm/nr_hugepages_mempolicy
NrOvercommitHugepages *int64 // /proc/sys/vm/nr_overcommit_hugepages
NumaStat *int64 // /proc/sys/vm/numa_stat
NumaZonelistOrder string // /proc/sys/vm/numa_zonelist_order
OomDumpTasks *int64 // /proc/sys/vm/oom_dump_tasks
OomKillAllocatingTask *int64 // /proc/sys/vm/oom_kill_allocating_task
OvercommitKbytes *int64 // /proc/sys/vm/overcommit_kbytes
OvercommitMemory *int64 // /proc/sys/vm/overcommit_memory
OvercommitRatio *int64 // /proc/sys/vm/overcommit_ratio
PageCluster *int64 // /proc/sys/vm/page-cluster
PanicOnOom *int64 // /proc/sys/vm/panic_on_oom
PercpuPagelistFraction *int64 // /proc/sys/vm/percpu_pagelist_fraction
StatInterval *int64 // /proc/sys/vm/stat_interval
Swappiness *int64 // /proc/sys/vm/swappiness
UserReserveKbytes *int64 // /proc/sys/vm/user_reserve_kbytes
VfsCachePressure *int64 // /proc/sys/vm/vfs_cache_pressure
WatermarkBoostFactor *int64 // /proc/sys/vm/watermark_boost_factor
WatermarkScaleFactor *int64 // /proc/sys/vm/watermark_scale_factor
ZoneReclaimMode *int64 // /proc/sys/vm/zone_reclaim_mode
}
// VM reads the VM statistics from the specified `proc` filesystem.
func (fs FS) VM() (*VM, error) {
path := fs.proc.Path("sys/vm")
file, err := os.Stat(path)
if err != nil {
return nil, err
}
if !file.Mode().IsDir() {
return nil, fmt.Errorf("%s is not a directory", path)
}
files, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
var vm VM
for _, f := range files {
if f.IsDir() {
continue
}
name := filepath.Join(path, f.Name())
// ignore errors on read, as there are some write only
// in /proc/sys/vm
value, err := util.SysReadFile(name)
if err != nil {
continue
}
vp := util.NewValueParser(value)
switch f.Name() {
case "admin_reserve_kbytes":
vm.AdminReserveKbytes = vp.PInt64()
case "block_dump":
vm.BlockDump = vp.PInt64()
case "compact_unevictable_allowed":
vm.CompactUnevictableAllowed = vp.PInt64()
case "dirty_background_bytes":
vm.DirtyBackgroundBytes = vp.PInt64()
case "dirty_background_ratio":
vm.DirtyBackgroundRatio = vp.PInt64()
case "dirty_bytes":
vm.DirtyBytes = vp.PInt64()
case "dirty_expire_centisecs":
vm.DirtyExpireCentisecs = vp.PInt64()
case "dirty_ratio":
vm.DirtyRatio = vp.PInt64()
case "dirtytime_expire_seconds":
vm.DirtytimeExpireSeconds = vp.PInt64()
case "dirty_writeback_centisecs":
vm.DirtyWritebackCentisecs = vp.PInt64()
case "drop_caches":
vm.DropCaches = vp.PInt64()
case "extfrag_threshold":
vm.ExtfragThreshold = vp.PInt64()
case "hugetlb_shm_group":
vm.HugetlbShmGroup = vp.PInt64()
case "laptop_mode":
vm.LaptopMode = vp.PInt64()
case "legacy_va_layout":
vm.LegacyVaLayout = vp.PInt64()
case "lowmem_reserve_ratio":
stringSlice := strings.Fields(value)
pint64Slice := make([]*int64, 0, len(stringSlice))
for _, value := range stringSlice {
vp := util.NewValueParser(value)
pint64Slice = append(pint64Slice, vp.PInt64())
}
vm.LowmemReserveRatio = pint64Slice
case "max_map_count":
vm.MaxMapCount = vp.PInt64()
case "memory_failure_early_kill":
vm.MemoryFailureEarlyKill = vp.PInt64()
case "memory_failure_recovery":
vm.MemoryFailureRecovery = vp.PInt64()
case "min_free_kbytes":
vm.MinFreeKbytes = vp.PInt64()
case "min_slab_ratio":
vm.MinSlabRatio = vp.PInt64()
case "min_unmapped_ratio":
vm.MinUnmappedRatio = vp.PInt64()
case "mmap_min_addr":
vm.MmapMinAddr = vp.PInt64()
case "nr_hugepages":
vm.NrHugepages = vp.PInt64()
case "nr_hugepages_mempolicy":
vm.NrHugepagesMempolicy = vp.PInt64()
case "nr_overcommit_hugepages":
vm.NrOvercommitHugepages = vp.PInt64()
case "numa_stat":
vm.NumaStat = vp.PInt64()
case "numa_zonelist_order":
vm.NumaZonelistOrder = value
case "oom_dump_tasks":
vm.OomDumpTasks = vp.PInt64()
case "oom_kill_allocating_task":
vm.OomKillAllocatingTask = vp.PInt64()
case "overcommit_kbytes":
vm.OvercommitKbytes = vp.PInt64()
case "overcommit_memory":
vm.OvercommitMemory = vp.PInt64()
case "overcommit_ratio":
vm.OvercommitRatio = vp.PInt64()
case "page-cluster":
vm.PageCluster = vp.PInt64()
case "panic_on_oom":
vm.PanicOnOom = vp.PInt64()
case "percpu_pagelist_fraction":
vm.PercpuPagelistFraction = vp.PInt64()
case "stat_interval":
vm.StatInterval = vp.PInt64()
case "swappiness":
vm.Swappiness = vp.PInt64()
case "user_reserve_kbytes":
vm.UserReserveKbytes = vp.PInt64()
case "vfs_cache_pressure":
vm.VfsCachePressure = vp.PInt64()
case "watermark_boost_factor":
vm.WatermarkBoostFactor = vp.PInt64()
case "watermark_scale_factor":
vm.WatermarkScaleFactor = vp.PInt64()
case "zone_reclaim_mode":
vm.ZoneReclaimMode = vp.PInt64()
}
if err := vp.Err(); err != nil {
return nil, err
}
}
return &vm, nil
}

@ -1,196 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows
package procfs
import (
"bytes"
"fmt"
"io/ioutil"
"regexp"
"strings"
"github.com/prometheus/procfs/internal/util"
)
// Zoneinfo holds info parsed from /proc/zoneinfo.
type Zoneinfo struct {
Node string
Zone string
NrFreePages *int64
Min *int64
Low *int64
High *int64
Scanned *int64
Spanned *int64
Present *int64
Managed *int64
NrActiveAnon *int64
NrInactiveAnon *int64
NrIsolatedAnon *int64
NrAnonPages *int64
NrAnonTransparentHugepages *int64
NrActiveFile *int64
NrInactiveFile *int64
NrIsolatedFile *int64
NrFilePages *int64
NrSlabReclaimable *int64
NrSlabUnreclaimable *int64
NrMlockStack *int64
NrKernelStack *int64
NrMapped *int64
NrDirty *int64
NrWriteback *int64
NrUnevictable *int64
NrShmem *int64
NrDirtied *int64
NrWritten *int64
NumaHit *int64
NumaMiss *int64
NumaForeign *int64
NumaInterleave *int64
NumaLocal *int64
NumaOther *int64
Protection []*int64
}
var nodeZoneRE = regexp.MustCompile(`(\d+), zone\s+(\w+)`)
// Zoneinfo parses an zoneinfo-file (/proc/zoneinfo) and returns a slice of
// structs containing the relevant info. More information available here:
// https://www.kernel.org/doc/Documentation/sysctl/vm.txt
func (fs FS) Zoneinfo() ([]Zoneinfo, error) {
data, err := ioutil.ReadFile(fs.proc.Path("zoneinfo"))
if err != nil {
return nil, fmt.Errorf("error reading zoneinfo %s: %s", fs.proc.Path("zoneinfo"), err)
}
zoneinfo, err := parseZoneinfo(data)
if err != nil {
return nil, fmt.Errorf("error parsing zoneinfo %s: %s", fs.proc.Path("zoneinfo"), err)
}
return zoneinfo, nil
}
func parseZoneinfo(zoneinfoData []byte) ([]Zoneinfo, error) {
zoneinfo := []Zoneinfo{}
zoneinfoBlocks := bytes.Split(zoneinfoData, []byte("\nNode"))
for _, block := range zoneinfoBlocks {
var zoneinfoElement Zoneinfo
lines := strings.Split(string(block), "\n")
for _, line := range lines {
if nodeZone := nodeZoneRE.FindStringSubmatch(line); nodeZone != nil {
zoneinfoElement.Node = nodeZone[1]
zoneinfoElement.Zone = nodeZone[2]
continue
}
if strings.HasPrefix(strings.TrimSpace(line), "per-node stats") {
zoneinfoElement.Zone = ""
continue
}
parts := strings.Fields(strings.TrimSpace(line))
if len(parts) < 2 {
continue
}
vp := util.NewValueParser(parts[1])
switch parts[0] {
case "nr_free_pages":
zoneinfoElement.NrFreePages = vp.PInt64()
case "min":
zoneinfoElement.Min = vp.PInt64()
case "low":
zoneinfoElement.Low = vp.PInt64()
case "high":
zoneinfoElement.High = vp.PInt64()
case "scanned":
zoneinfoElement.Scanned = vp.PInt64()
case "spanned":
zoneinfoElement.Spanned = vp.PInt64()
case "present":
zoneinfoElement.Present = vp.PInt64()
case "managed":
zoneinfoElement.Managed = vp.PInt64()
case "nr_active_anon":
zoneinfoElement.NrActiveAnon = vp.PInt64()
case "nr_inactive_anon":
zoneinfoElement.NrInactiveAnon = vp.PInt64()
case "nr_isolated_anon":
zoneinfoElement.NrIsolatedAnon = vp.PInt64()
case "nr_anon_pages":
zoneinfoElement.NrAnonPages = vp.PInt64()
case "nr_anon_transparent_hugepages":
zoneinfoElement.NrAnonTransparentHugepages = vp.PInt64()
case "nr_active_file":
zoneinfoElement.NrActiveFile = vp.PInt64()
case "nr_inactive_file":
zoneinfoElement.NrInactiveFile = vp.PInt64()
case "nr_isolated_file":
zoneinfoElement.NrIsolatedFile = vp.PInt64()
case "nr_file_pages":
zoneinfoElement.NrFilePages = vp.PInt64()
case "nr_slab_reclaimable":
zoneinfoElement.NrSlabReclaimable = vp.PInt64()
case "nr_slab_unreclaimable":
zoneinfoElement.NrSlabUnreclaimable = vp.PInt64()
case "nr_mlock_stack":
zoneinfoElement.NrMlockStack = vp.PInt64()
case "nr_kernel_stack":
zoneinfoElement.NrKernelStack = vp.PInt64()
case "nr_mapped":
zoneinfoElement.NrMapped = vp.PInt64()
case "nr_dirty":
zoneinfoElement.NrDirty = vp.PInt64()
case "nr_writeback":
zoneinfoElement.NrWriteback = vp.PInt64()
case "nr_unevictable":
zoneinfoElement.NrUnevictable = vp.PInt64()
case "nr_shmem":
zoneinfoElement.NrShmem = vp.PInt64()
case "nr_dirtied":
zoneinfoElement.NrDirtied = vp.PInt64()
case "nr_written":
zoneinfoElement.NrWritten = vp.PInt64()
case "numa_hit":
zoneinfoElement.NumaHit = vp.PInt64()
case "numa_miss":
zoneinfoElement.NumaMiss = vp.PInt64()
case "numa_foreign":
zoneinfoElement.NumaForeign = vp.PInt64()
case "numa_interleave":
zoneinfoElement.NumaInterleave = vp.PInt64()
case "numa_local":
zoneinfoElement.NumaLocal = vp.PInt64()
case "numa_other":
zoneinfoElement.NumaOther = vp.PInt64()
case "protection:":
protectionParts := strings.Split(line, ":")
protectionValues := strings.Replace(protectionParts[1], "(", "", 1)
protectionValues = strings.Replace(protectionValues, ")", "", 1)
protectionValues = strings.TrimSpace(protectionValues)
protectionStringMap := strings.Split(protectionValues, ", ")
val, err := util.ParsePInt64s(protectionStringMap)
if err == nil {
zoneinfoElement.Protection = val
}
}
}
zoneinfo = append(zoneinfo, zoneinfoElement)
}
return zoneinfo, nil
}

@ -257,15 +257,11 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u
nilAwareAdd(&cfg.Learners, id)
}
prs[id] = &tracker.Progress{
// Initializing the Progress with the last index means that the follower
// can be probed (with the last index).
// We initialize Progress.Next with lastIndex+1 so that the peer will be
// probed without an index first.
//
// TODO(tbg): seems awfully optimistic. Using the first index would be
// better. The general expectation here is that the follower has no log
// at all (and will thus likely need a snapshot), though the app may
// have applied a snapshot out of band before adding the replica (thus
// making the first index the better choice).
Next: c.LastIndex,
// TODO(tbg): verify that, this is just my best guess.
Next: c.LastIndex + 1,
Match: 0,
Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
IsLearner: isLearner,

@ -55,7 +55,10 @@ func (u *unstable) maybeLastIndex() (uint64, bool) {
// is any.
func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
if i < u.offset {
if u.snapshot != nil && u.snapshot.Metadata.Index == i {
if u.snapshot == nil {
return 0, false
}
if u.snapshot.Metadata.Index == i {
return u.snapshot.Metadata.Term, true
}
return 0, false
@ -68,7 +71,6 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
if i > last {
return 0, false
}
return u.entries[i-u.offset].Term, true
}

@ -1036,36 +1036,10 @@ func stepLeader(r *raft, m pb.Message) error {
for i := range m.Entries {
e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
var ccc pb.ConfChange
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
} else if e.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
}
if cc != nil {
alreadyPending := r.pendingConfIndex > r.raftLog.applied
alreadyJoint := len(r.prs.Config.Voters[1]) > 0
wantsLeaveJoint := len(cc.AsV2().Changes) == 0
var refused string
if alreadyPending {
refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
} else if alreadyJoint && !wantsLeaveJoint {
refused = "must transition out of joint config first"
} else if !alreadyJoint && wantsLeaveJoint {
refused = "not in joint state; refusing empty conf change"
}
if refused != "" {
r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
if e.Type == pb.EntryConfChange || e.Type == pb.EntryConfChangeV2 {
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("%x propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
r.id, e, r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
@ -1553,18 +1527,10 @@ func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.Co
if r.state != StateLeader || len(cs.Voters) == 0 {
return cs
}
if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
// The quorum size may have been reduced (but not to zero), so see if
// any pending entries can be committed.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.prs.Visit(func(id uint64, pr *tracker.Progress) {
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 {

@ -77,8 +77,8 @@ func DescribeSoftState(ss SoftState) string {
func DescribeConfState(state pb.ConfState) string {
return fmt.Sprintf(
"Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v AutoLeave:%v",
state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, state.AutoLeave,
"Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v",
state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext,
)
}

@ -26,7 +26,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "3.0.0"
Version = "3.4.0-rc.2"
Version = "3.4.0-rc.1"
APIVersion = "unknown"
// Git SHA Value will be set during build

Loading…
Cancel
Save