Loki: Implement stream sharding (#6952)

* introduce 'StreamSharder' interface to decouple sharding from managing shards

* make the streamsharder take a whole stream instead of just and ID

* Implement stream sharding usage.

* move streamShard to distributor

* Implement the stream sharder and shard iter (#6941)

* Modify sharding to not duplicate all entries for all shards (#6942)

* Modify sharding to not duplicate all entries for all shards.

* Rename ShardIter and get rid of NextShardId method.

* Get rid of ShardIter and ShardStats.

* Implement a StreamSharder mock and use it in tests.

* Rewrite stubbing and remove flags.

* Add auto-splitting.

* Only shard stream when approppriate.

* Make sharding mode an enum.

* Bench shardStream function.

* Modify shard comparison to be <= 1.

* Add a guard clause for when lowerbound>upperbound.

* Apply suggestions from code review

Avoid using `fmt.Sprintf`.

Co-authored-by: Danny Kopping <danny.kopping@grafana.com>

* Update pkg/distributor/distributor.go

Rename metric name.

Co-authored-by: Danny Kopping <danny.kopping@grafana.com>

* Avoid double-negative by Danny's suggestion.

* Move TODO to its right place.

* Log scenario where upperbound < lowerbound.

* Reuse compiled regex.

* Add docstring explaining why we use max(shards*2, 2).

* make sharder use a RWMutex

* simplify sharding config

* replace doc string

* Revert "Add auto-splitting" for standalone PR.

This reverts commit e3886a6210.

* shard loop refactor

* move shard creation into it's own function

* fix after refactor

* make config more explicit and document it

* reduce allocations

* review feedback

Co-authored-by: Travis Patterson <travis.patterson@grafana.com>
Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
pull/6970/head
Dylan Guedes 3 years ago committed by GitHub
parent 15f8f42295
commit 5d8c48bd10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      docs/sources/configuration/_index.md
  2. 142
      pkg/distributor/distributor.go
  3. 324
      pkg/distributor/distributor_test.go
  4. 49
      pkg/distributor/streamsharder.go
  5. 73
      pkg/distributor/streamsharder_test.go
  6. 9
      pkg/loki/modules.go

@ -306,6 +306,20 @@ ring:
# reading and writing.
# CLI flag: -distributor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# Configures the distributor to shard streams that are too big
shard_streams:
# Whether to enable stream sharding
#
# CLI flag: -distributor.stream-sharding.enabled
[enabled: <boolean> | default = false]
# Enable logging when sharding streams because logging on the read path may
# impact performance. When disabled, stream sharding will emit no logs
# regardless of log level
#
# CLI flag: -distributor.stream-sharding.logging-enabled
[logging_enabled: <boolean> | default = false]
```
## querier

@ -4,8 +4,13 @@ import (
"context"
"flag"
"net/http"
"strconv"
"strings"
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
@ -33,6 +38,11 @@ import (
)
const (
// ShardLbName is the internal label to be used by Loki when dividing a stream into smaller pieces.
// Possible values are only increasing integers starting from 0.
ShardLbName = "__stream_shard__"
ShardLbPlaceholder = "__placeholder__"
ringKey = "distributor"
)
@ -41,6 +51,11 @@ var (
rfStats = usagestats.NewInt("distributor_replication_factor")
)
type ShardStreamsConfig struct {
Enabled bool `yaml:"enabled"`
LoggingEnabled bool `yaml:"logging_enabled"`
}
// Config for a Distributor.
type Config struct {
// Distributors ring
@ -48,11 +63,22 @@ type Config struct {
// For testing.
factory ring_client.PoolFactory `yaml:"-"`
// ShardStreams configures wether big streams should be sharded or not.
ShardStreams ShardStreamsConfig `yaml:"shard_streams"`
}
// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(fs)
fs.BoolVar(&cfg.ShardStreams.Enabled, "distributor.stream-sharding.enabled", false, "Automatically shard streams to keep them under the per-stream rate limit")
fs.BoolVar(&cfg.ShardStreams.LoggingEnabled, "distributor.stream-sharding.logging-enabled", false, "Enable logging when sharding streams")
}
// StreamSharder manages the state necessary to shard streams.
type StreamSharder interface {
ShardCountFor(stream logproto.Stream) (int, bool)
IncreaseShardsFor(stream logproto.Stream)
}
// Distributor coordinates replicates and distribution of log streams.
@ -66,6 +92,7 @@ type Distributor struct {
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
streamSharder StreamSharder
// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
@ -75,11 +102,9 @@ type Distributor struct {
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
labelCache *lru.Cache
// metrics
ingesterAppends *prometheus.CounterVec
ingesterAppendFailures *prometheus.CounterVec
@ -87,7 +112,14 @@ type Distributor struct {
}
// New a distributor creates.
func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) {
func New(
cfg Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
overrides *validation.Overrides,
registerer prometheus.Registerer,
) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
@ -167,6 +199,8 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
d.subservicesWatcher.WatchManager(d.subservices)
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
d.streamSharder = NewStreamSharder()
return &d, nil
}
@ -276,8 +310,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
stream.Entries = stream.Entries[:n]
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{stream: stream})
if d.cfg.ShardStreams.Enabled {
derivedKeys, derivedStreams := d.shardStream(stream, userID)
keys = append(keys, derivedKeys...)
streams = append(streams, derivedStreams...)
} else {
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{stream: stream})
}
}
// Return early if none of the streams contained entries
@ -339,6 +379,98 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}
func min(x1, x2 int) int {
if x1 < x2 {
return x1
}
return x2
}
// shardStream shards (divides) the given stream into N smaller streams, where
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
func (d *Distributor) shardStream(stream logproto.Stream, userID string) ([]uint32, []streamTracker) {
shardCount, ok := d.streamSharder.ShardCountFor(stream)
if !ok || shardCount <= 1 {
return []uint32{util.TokenFor(userID, stream.Labels)}, []streamTracker{{stream: stream}}
}
if d.cfg.ShardStreams.LoggingEnabled {
level.Info(util_log.Logger).Log("msg", "sharding request", "stream", stream.Labels)
}
streamLabels := labelTemplate(stream.Labels)
streamPattern := streamLabels.String()
derivedKeys := make([]uint32, 0, shardCount)
derivedStreams := make([]streamTracker, 0, shardCount)
for i := 0; i < shardCount; i++ {
shard, ok := d.createShard(stream, streamLabels, streamPattern, shardCount, i)
if !ok {
continue
}
derivedKeys = append(derivedKeys, util.TokenFor(userID, shard.Labels))
derivedStreams = append(derivedStreams, streamTracker{stream: shard})
if d.cfg.ShardStreams.LoggingEnabled {
level.Info(util_log.Logger).Log("msg", "stream derived from sharding", "src-stream", stream.Labels, "derived-stream", shard.Labels)
}
}
return derivedKeys, derivedStreams
}
// labelTemplate returns a label set that includes the dummy label to be replaced
// To avoid allocations, this slice is reused when we know the stream value
func labelTemplate(lbls string) labels.Labels {
baseLbls, err := syntax.ParseLabels(lbls)
if err != nil {
level.Error(util_log.Logger).Log("msg", "couldn't extract labels from stream", "stream", lbls)
return nil
}
streamLabels := make([]labels.Label, len(baseLbls)+1)
for i := 0; i < len(baseLbls); i++ {
streamLabels[i] = baseLbls[i]
}
streamLabels[len(baseLbls)] = labels.Label{Name: ShardLbName, Value: ShardLbPlaceholder}
return streamLabels
}
func (d *Distributor) createShard(stream logproto.Stream, lbls labels.Labels, streamPattern string, totalShards, shardNumber int) (logproto.Stream, bool) {
lowerBound, upperBound, ok := d.boundsFor(stream, totalShards, shardNumber)
if !ok {
return logproto.Stream{}, false
}
shardLabel := strconv.Itoa(shardNumber)
lbls[len(lbls)-1] = labels.Label{Name: ShardLbName, Value: shardLabel}
return logproto.Stream{
Labels: strings.Replace(streamPattern, ShardLbPlaceholder, shardLabel, 1),
Hash: lbls.Hash(),
Entries: stream.Entries[lowerBound:upperBound],
}, true
}
func (d *Distributor) boundsFor(stream logproto.Stream, totalShards, shardNumber int) (int, int, bool) {
entriesPerWindow := float64(len(stream.Entries)) / float64(totalShards)
fIdx := float64(shardNumber)
lowerBound := int(fIdx * entriesPerWindow)
upperBound := min(int(entriesPerWindow*(1+fIdx)), len(stream.Entries))
if lowerBound > upperBound {
if d.cfg.ShardStreams.LoggingEnabled {
level.Warn(util_log.Logger).Log("msg", "sharding with lowerbound > upperbound", "lowerbound", lowerBound, "upperbound", upperBound, "shards", totalShards, "labels", stream.Labels)
}
return 0, 0, false
}
return lowerBound, upperBound, true
}
// maxT returns the highest between two given timestamps.
func maxT(t1, t2 time.Time) time.Time {
if t1.Before(t2) {

@ -20,6 +20,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
@ -29,6 +30,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/runtime"
fe "github.com/grafana/loki/pkg/util/flagext"
loki_net "github.com/grafana/loki/pkg/util/net"
@ -373,6 +375,328 @@ func Test_TruncateLogLines(t *testing.T) {
})
}
func TestStreamShard(t *testing.T) {
// setup base stream.
baseStream := logproto.Stream{}
baseLabels := "{app='myapp', job='fizzbuzz'}"
lbs, err := syntax.ParseLabels(baseLabels)
require.NoError(t, err)
baseStream.Hash = lbs.Hash()
baseStream.Labels = lbs.String()
// helper funcs
generateEntries := func(n int) []logproto.Entry {
var entries []logproto.Entry
for i := 0; i < n; i++ {
entries = append(entries, logproto.Entry{
Line: fmt.Sprintf("log line %d", i),
Timestamp: time.Now(),
})
}
return entries
}
generateShardLabels := func(baseLabels string, idx int) labels.Labels {
// append a shard label to the given labels. The shard value will be 'idx'.
lbs, err := syntax.ParseLabels(baseLabels)
require.NoError(t, err)
lbs = append(lbs, labels.Label{Name: ShardLbName, Value: fmt.Sprintf("%d", idx)})
return lbs
}
totalEntries := generateEntries(100)
for _, tc := range []struct {
name string
entries []logproto.Entry
shards int // stub call to ShardCountFor.
wantDerivedStream []streamTracker
}{
{
name: "one shard with no entries",
entries: nil,
shards: 1,
wantDerivedStream: []streamTracker{{stream: baseStream}},
},
{
name: "one shard with one entry",
shards: 1,
entries: totalEntries[0:1],
wantDerivedStream: []streamTracker{
{
stream: logproto.Stream{
Entries: []logproto.Entry{totalEntries[0]},
Labels: baseStream.Labels,
Hash: baseStream.Hash,
},
},
},
},
{
name: "two shards with 3 entries",
shards: 2,
entries: totalEntries[0:3],
wantDerivedStream: []streamTracker{
{ // shard 1.
stream: logproto.Stream{
Entries: totalEntries[0:1],
Labels: generateShardLabels(baseLabels, 0).String(),
Hash: generateShardLabels(baseLabels, 0).Hash(),
},
}, // shard 2.
{
stream: logproto.Stream{
Entries: totalEntries[1:3],
Labels: generateShardLabels(baseLabels, 1).String(),
Hash: generateShardLabels(baseLabels, 1).Hash(),
},
},
},
},
{
name: "two shards with 5 entries",
shards: 2,
entries: totalEntries[0:5],
wantDerivedStream: []streamTracker{
{ // shard 1.
stream: logproto.Stream{
Entries: totalEntries[0:2],
Labels: generateShardLabels(baseLabels, 0).String(),
Hash: generateShardLabels(baseLabels, 0).Hash(),
},
}, // shard 2.
{
stream: logproto.Stream{
Entries: totalEntries[2:5],
Labels: generateShardLabels(baseLabels, 1).String(),
Hash: generateShardLabels(baseLabels, 1).Hash(),
},
},
},
},
{
name: "one shard with 20 entries",
shards: 1,
entries: totalEntries[0:20],
wantDerivedStream: []streamTracker{
{ // shard 1.
stream: logproto.Stream{
Entries: totalEntries[0:20],
Labels: baseStream.Labels,
Hash: baseStream.Hash,
},
},
},
},
{
name: "two shards with 20 entries",
shards: 2,
entries: totalEntries[0:20],
wantDerivedStream: []streamTracker{
{ // shard 1.
stream: logproto.Stream{
Entries: totalEntries[0:10],
Labels: generateShardLabels(baseLabels, 0).String(),
Hash: generateShardLabels(baseLabels, 0).Hash(),
},
}, // shard 2.
{
stream: logproto.Stream{
Entries: totalEntries[10:20],
Labels: generateShardLabels(baseLabels, 1).String(),
Hash: generateShardLabels(baseLabels, 1).Hash(),
},
},
},
},
{
name: "four shards with 20 entries",
shards: 4,
entries: totalEntries[0:20],
wantDerivedStream: []streamTracker{
{ // shard 1.
stream: logproto.Stream{
Entries: totalEntries[0:5],
Labels: generateShardLabels(baseLabels, 0).String(),
Hash: generateShardLabels(baseLabels, 0).Hash(),
},
},
{ // shard 2.
stream: logproto.Stream{
Entries: totalEntries[5:10],
Labels: generateShardLabels(baseLabels, 1).String(),
Hash: generateShardLabels(baseLabels, 1).Hash(),
},
},
{ // shard 3.
stream: logproto.Stream{
Entries: totalEntries[10:15],
Labels: generateShardLabels(baseLabels, 2).String(),
Hash: generateShardLabels(baseLabels, 2).Hash(),
},
},
{ // shard 4.
stream: logproto.Stream{
Entries: totalEntries[15:20],
Labels: generateShardLabels(baseLabels, 3).String(),
Hash: generateShardLabels(baseLabels, 3).Hash(),
},
},
},
},
{
name: "four shards with 2 entries",
shards: 4,
entries: totalEntries[0:2],
wantDerivedStream: []streamTracker{
{
stream: logproto.Stream{
Entries: []logproto.Entry{},
Labels: generateShardLabels(baseLabels, 0).String(),
Hash: generateShardLabels(baseLabels, 0).Hash(),
},
},
{
stream: logproto.Stream{
Entries: totalEntries[0:1],
Labels: generateShardLabels(baseLabels, 1).String(),
Hash: generateShardLabels(baseLabels, 1).Hash(),
},
},
{
stream: logproto.Stream{
Entries: []logproto.Entry{},
Labels: generateShardLabels(baseLabels, 2).String(),
Hash: generateShardLabels(baseLabels, 2).Hash(),
},
},
{
stream: logproto.Stream{
Entries: totalEntries[1:2],
Labels: generateShardLabels(baseLabels, 3).String(),
Hash: generateShardLabels(baseLabels, 3).Hash(),
},
},
},
},
{
name: "four shards with 1 entry",
shards: 4,
entries: totalEntries[0:1],
wantDerivedStream: []streamTracker{
{
stream: logproto.Stream{
Labels: generateShardLabels(baseLabels, 0).String(),
Hash: generateShardLabels(baseLabels, 0).Hash(),
Entries: []logproto.Entry{},
},
},
{
stream: logproto.Stream{
Labels: generateShardLabels(baseLabels, 1).String(),
Hash: generateShardLabels(baseLabels, 1).Hash(),
Entries: []logproto.Entry{},
},
},
{
stream: logproto.Stream{
Labels: generateShardLabels(baseLabels, 2).String(),
Hash: generateShardLabels(baseLabels, 2).Hash(),
Entries: []logproto.Entry{},
},
},
{
stream: logproto.Stream{
Entries: totalEntries[0:1],
Labels: generateShardLabels(baseLabels, 3).String(),
Hash: generateShardLabels(baseLabels, 3).Hash(),
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
d := Distributor{
streamSharder: NewStreamSharderMock(tc.shards),
}
baseStream.Entries = tc.entries
_, derivedStreams := d.shardStream(baseStream, "fake")
require.Equal(t, tc.wantDerivedStream, derivedStreams)
})
}
}
func BenchmarkShardStream(b *testing.B) {
stream := logproto.Stream{}
labels := "{app='myapp', job='fizzbuzz'}"
lbs, err := syntax.ParseLabels(labels)
require.NoError(b, err)
stream.Hash = lbs.Hash()
stream.Labels = lbs.String()
// helper funcs
generateEntries := func(n int) []logproto.Entry {
var entries []logproto.Entry
for i := 0; i < n; i++ {
entries = append(entries, logproto.Entry{
Line: fmt.Sprintf("log line %d", i),
Timestamp: time.Now(),
})
}
return entries
}
allEntries := generateEntries(25000)
b.Run("high number of entries, low number of shards", func(b *testing.B) {
d := Distributor{
streamSharder: NewStreamSharderMock(2),
}
stream.Entries = allEntries
b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, "fake")
}
})
b.Run("low number of entries, low number of shards", func(b *testing.B) {
d := Distributor{
streamSharder: NewStreamSharderMock(2),
}
stream.Entries = nil
b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, "fake")
}
})
b.Run("high number of entries, high number of shards", func(b *testing.B) {
d := Distributor{
streamSharder: NewStreamSharderMock(64),
}
stream.Entries = allEntries
b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, "fake")
}
})
b.Run("low number of entries, high number of shards", func(b *testing.B) {
d := Distributor{
streamSharder: NewStreamSharderMock(64),
}
stream.Entries = nil
b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, "fake")
}
})
}
func Benchmark_SortLabelsOnPush(b *testing.B) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

@ -0,0 +1,49 @@
package distributor
import (
"sync"
"github.com/grafana/loki/pkg/logproto"
)
type streamSharder struct {
mu sync.RWMutex
streams map[string]int
}
func NewStreamSharder() StreamSharder {
return &streamSharder{
streams: make(map[string]int),
}
}
func (s *streamSharder) ShardCountFor(stream logproto.Stream) (int, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
shards := s.streams[stream.Labels]
if shards > 0 {
return shards, true
}
return 0, false
}
// IncreaseShardsFor shards the given stream by doubling its number of shards.
func (s *streamSharder) IncreaseShardsFor(stream logproto.Stream) {
s.mu.Lock()
defer s.mu.Unlock()
shards := s.streams[stream.Labels]
// Since the number of shards of a stream that is being sharded for the first time is 0,
// we assign to it shards = max(shards*2, 2) such that its number of shards will be no less than 2.
s.streams[stream.Labels] = max(shards*2, 2)
}
func max(a, b int) int {
if a > b {
return a
}
return b
}

@ -0,0 +1,73 @@
package distributor
import (
"testing"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)
func TestStreamSharder(t *testing.T) {
stream := logproto.Stream{Entries: make([]logproto.Entry, 11), Labels: "test-stream"}
stream2 := logproto.Stream{Entries: make([]logproto.Entry, 11), Labels: "test-stream-2"}
t.Run("it returns not ok when a stream should not be sharded", func(t *testing.T) {
sharder := NewStreamSharder()
shards, ok := sharder.ShardCountFor(stream)
require.Equal(t, shards, 0)
require.False(t, ok)
})
t.Run("it keeps track of multiple streams", func(t *testing.T) {
sharder := NewStreamSharder()
sharder.IncreaseShardsFor(stream)
sharder.IncreaseShardsFor(stream)
sharder.IncreaseShardsFor(stream2)
shards, ok := sharder.ShardCountFor(stream)
require.True(t, ok)
require.Equal(t, 4, shards)
shards, ok = sharder.ShardCountFor(stream2)
require.True(t, ok)
require.Equal(t, 2, shards)
})
}
type StreamSharderMock struct {
calls map[string]int
wantShards int
}
func NewStreamSharderMock(shards int) *StreamSharderMock {
return &StreamSharderMock{
calls: make(map[string]int),
wantShards: shards,
}
}
func (s *StreamSharderMock) IncreaseShardsFor(stream logproto.Stream) {
s.increaseCallsFor("IncreaseShardsFor")
}
func (s *StreamSharderMock) ShardCountFor(stream logproto.Stream) (int, bool) {
s.increaseCallsFor("ShardCountFor")
if s.wantShards < 0 {
return 0, false
}
return s.wantShards, true
}
func (s *StreamSharderMock) increaseCallsFor(funcName string) {
if _, ok := s.calls[funcName]; ok {
s.calls[funcName]++
return
}
s.calls[funcName] = 1
}

@ -260,7 +260,14 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) {
func (t *Loki) initDistributor() (services.Service, error) {
var err error
t.distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer)
t.distributor, err = distributor.New(
t.Cfg.Distributor,
t.Cfg.IngesterClient,
t.tenantConfigs,
t.ring,
t.overrides,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}

Loading…
Cancel
Save