Fix stream sharding (#7393)

Promtail doesn't add a stream hash to push requests. As a result, we
weren't accounting for the current rate in our sharding calculation and
only sharding on individual large pushes.

This PR ensures that streams have a hash as they're ingested. It also
add a couple of metrics that were useful in debugging this issue.
pull/7402/head
Travis Patterson 3 years ago committed by GitHub
parent 33bf36aa21
commit 8c84ee8b7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      pkg/distributor/distributor.go
  2. 11
      pkg/distributor/distributor_test.go
  3. 13
      pkg/distributor/ratestore.go
  4. 6
      pkg/distributor/ratestore_metrics.go

@ -103,6 +103,7 @@ type Distributor struct {
ingesterAppendFailures *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardingFailures *prometheus.CounterVec
streamShardCount prometheus.Counter
}
// New a distributor creates.
@ -194,6 +195,11 @@ func New(
}, []string{
"reason",
}),
streamShardCount: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "stream_sharding_count",
Help: "Total number of times the distributor has sharded streams",
}),
}
d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
rfStats.Set(int64(ingestersRing.ReplicationFactor()))
@ -291,7 +297,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)
stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
validationErr = err
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, userID).Add(float64(len(stream.Entries)))
@ -421,6 +427,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, streamSize int, userID
return []uint32{util.TokenFor(userID, stream.Labels)}, []streamTracker{{stream: stream}}
}
d.streamShardCount.Inc()
if shardStreamsCfg.LoggingEnabled {
level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount)
}
@ -578,22 +585,32 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
return err
}
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, error) {
labelVal, ok := d.labelCache.Get(key)
if ok {
return labelVal.(string), nil
type labelData struct {
labels string
hash uint64
}
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
labelVal := val.(labelData)
return labelVal.labels, labelVal.hash, nil
}
ls, err := syntax.ParseLabels(key)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
return "", 0, httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, key, err)
}
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", err
return "", 0, err
}
lsVal := ls.String()
d.labelCache.Add(key, lsVal)
return lsVal, nil
lsHash := ls.Hash()
d.labelCache.Add(key, labelData{lsVal, lsHash})
return lsVal, lsHash, nil
}
// shardCountFor returns the right number of shards to be used by the given stream.
@ -624,6 +641,7 @@ func (d *Distributor) shardCountFor(logger log.Logger, stream *logproto.Stream,
// 1 shard is enough for the given stream.
return 1
}
return shards
}

@ -134,6 +134,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"},
@ -159,6 +160,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyiiiiiii"},
@ -184,6 +186,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
@ -209,6 +212,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"},
@ -234,6 +238,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 1), Line: "heyiiiiiii"},
@ -259,6 +264,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
@ -285,6 +291,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 1), Line: "hi"},
@ -312,6 +319,7 @@ func Test_IncrementTimestamp(t *testing.T) {
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Hash: 0x8eeb87f5eb220480,
Entries: []logproto.Entry{
{Timestamp: time.Unix(123456, 0), Line: "heyooooooo"},
{Timestamp: time.Unix(123456, 1), Line: "hi"},
@ -693,6 +701,7 @@ func TestStreamShard(t *testing.T) {
rateStore: &fakeRateStore{},
streamShardingFailures: shardingFailureMetric,
validator: validator,
streamShardCount: prometheus.NewCounter(prometheus.CounterOpts{}),
}
_, derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake")
@ -792,7 +801,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
_, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}

@ -124,19 +124,17 @@ func (s *rateStore) aggregateByShard(streamRates map[uint64]*logproto.StreamRate
shardCount := make(map[uint64]int)
rates := make(map[uint64]int64)
for _, sr := range streamRates {
shardCount[sr.StreamHashNoShard]++
if _, ok := rates[sr.StreamHashNoShard]; ok {
rates[sr.StreamHashNoShard] += sr.Rate
maxRate = max(rates[sr.StreamHashNoShard], maxRate)
shardCount[sr.StreamHashNoShard]++
continue
}
rates[sr.StreamHashNoShard] = sr.Rate
maxRate = max(rates[sr.StreamHashNoShard], maxRate)
shardCount[sr.StreamHashNoShard]++
}
var maxShards int64
@ -169,7 +167,7 @@ func (s *rateStore) getRates(ctx context.Context, clients []ingesterClient) map[
}
close(parallelClients)
return ratesPerStream(responses, len(clients))
return s.ratesPerStream(responses, len(clients))
}
func (s *rateStore) getRatesFromIngesters(ctx context.Context, clients chan ingesterClient, responses chan *logproto.StreamRatesResponse) {
@ -187,7 +185,8 @@ func (s *rateStore) getRatesFromIngesters(ctx context.Context, clients chan inge
}
}
func ratesPerStream(responses chan *logproto.StreamRatesResponse, totalResponses int) map[uint64]*logproto.StreamRate {
func (s *rateStore) ratesPerStream(responses chan *logproto.StreamRatesResponse, totalResponses int) map[uint64]*logproto.StreamRate {
var maxRate int64
streamRates := make(map[uint64]*logproto.StreamRate)
for i := 0; i < totalResponses; i++ {
resp := <-responses
@ -197,6 +196,7 @@ func ratesPerStream(responses chan *logproto.StreamRatesResponse, totalResponses
for j := 0; j < len(resp.StreamRates); j++ {
rate := resp.StreamRates[j]
maxRate = max(maxRate, rate.Rate)
if r, ok := streamRates[rate.StreamHash]; ok {
if r.Rate < rate.Rate {
@ -209,6 +209,7 @@ func ratesPerStream(responses chan *logproto.StreamRatesResponse, totalResponses
}
}
s.metrics.maxUniqueStreamRate.Set(float64(maxRate))
return streamRates
}

@ -11,6 +11,7 @@ type ratestoreMetrics struct {
streamCount prometheus.Gauge
maxStreamShardCount prometheus.Gauge
maxStreamRate prometheus.Gauge
maxUniqueStreamRate prometheus.Gauge
refreshDuration *instrument.HistogramCollector
}
@ -36,6 +37,11 @@ func newRateStoreMetrics(reg prometheus.Registerer) *ratestoreMetrics {
Name: "rate_store_max_stream_rate_bytes",
Help: "The maximum stream rate for any stream reported by ingesters during a sync operation. Sharded Streams are combined.",
}),
maxUniqueStreamRate: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "rate_store_max_unique_stream_rate_bytes",
Help: "The maximum stream rate for any stream reported by ingesters during a sync operation. Sharded Streams are considered separate.",
}),
refreshDuration: instrument.NewHistogramCollector(
promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{

Loading…
Cancel
Save