feat: flush not owned streams (#13254)

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
pull/13263/head
Vladyslav Diachenko 11 months ago committed by GitHub
parent f5a9905803
commit 2ca1ac66a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 17
      pkg/ingester/flush.go
  2. 50
      pkg/ingester/flush_test.go
  3. 11
      pkg/ingester/instance.go
  4. 51
      pkg/ingester/owned_streams.go
  5. 58
      pkg/ingester/owned_streams_test.go
  6. 4
      pkg/ingester/recalculate_owned_streams_test.go

@ -33,11 +33,12 @@ const (
nameLabel = "__name__"
logsValue = "logs"
flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonFull = "full"
flushReasonSynced = "synced"
flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonNotOwned = "not_owned"
flushReasonFull = "full"
flushReasonSynced = "synced"
)
// Note: this is called both during the WAL replay (zero or more times)
@ -124,7 +125,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
lastChunk := stream.chunks[len(stream.chunks)-1]
shouldFlush, _ := i.shouldFlushChunk(&lastChunk)
if len(stream.chunks) == 1 && !immediate && !shouldFlush {
if len(stream.chunks) == 1 && !immediate && !shouldFlush && !instance.ownedStreamsSvc.isStreamNotOwned(stream.fp) {
return
}
@ -217,10 +218,14 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()
notOwnedStream := instance.ownedStreamsSvc.isStreamNotOwned(fp)
var result []*chunkDesc
for j := range stream.chunks {
shouldFlush, reason := i.shouldFlushChunk(&stream.chunks[j])
if !shouldFlush && notOwnedStream {
shouldFlush, reason = true, flushReasonNotOwned
}
if immediate || shouldFlush {
// Ensure no more writes happen to this chunk.
if !stream.chunks[j].closed {

@ -253,6 +253,56 @@ func TestFlushingCollidingLabels(t *testing.T) {
}
}
func Test_flush_not_owned_stream(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()
now := time.Unix(0, 0)
entries := []logproto.Entry{
{Timestamp: now.Add(time.Nanosecond), Line: "1"},
{Timestamp: now.Add(time.Minute), Line: "2"},
}
labelSet := model.LabelSet{"app": "l"}
req := &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: labelSet.String(), Entries: entries},
}}
const userID = "testUser"
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ing.Push(ctx, req)
require.NoError(t, err)
time.Sleep(2 * cfg.FlushCheckPeriod)
// ensure chunk is not flushed after flush period elapses
store.checkData(t, map[string][]logproto.Stream{})
instance, found := ing.getInstanceByID(userID)
require.True(t, found)
fingerprint := instance.getHashForLabels(labels.FromStrings("app", "l"))
require.Equal(t, model.Fingerprint(16794418009594958), fingerprint)
instance.ownedStreamsSvc.trackStreamOwnership(fingerprint, false)
time.Sleep(2 * cfg.FlushCheckPeriod)
// assert stream is now both batches
store.checkData(t, map[string][]logproto.Stream{
userID: {
{Labels: labelSet.String(), Entries: entries},
},
})
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
}
func TestFlushMaxAge(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100

@ -357,7 +357,8 @@ func (i *instance) onStreamCreated(s *stream) {
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(s)
streamsCountStats.Add(1)
i.ownedStreamsSvc.incOwnedStreamCount()
// we count newly created stream as owned
i.ownedStreamsSvc.trackStreamOwnership(s.fp, true)
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "successfully created stream",
@ -421,7 +422,7 @@ func (i *instance) removeStream(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Dec()
memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String())))
streamsCountStats.Add(-1)
i.ownedStreamsSvc.decOwnedStreamCount()
i.ownedStreamsSvc.trackRemovedStream(s.fp)
}
}
@ -1181,11 +1182,7 @@ func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) error {
i.streams.WithLock(func() {
i.ownedStreamsSvc.resetStreamCounts()
err = i.streams.ForEach(func(s *stream) (bool, error) {
if ownedTokenRange.IncludesKey(uint32(s.fp)) {
i.ownedStreamsSvc.incOwnedStreamCount()
} else {
i.ownedStreamsSvc.incNotOwnedStreamCount()
}
i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedTokenRange.IncludesKey(uint32(s.fp)))
return true, nil
})
})

@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/util/constants"
@ -17,19 +18,20 @@ var notOwnedStreamsMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{
}, []string{"tenant"})
type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
notOwnedStreamCount int
lock sync.RWMutex
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
lock sync.RWMutex
notOwnedStreams map[model.Fingerprint]any
}
func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
notOwnedStreams: make(map[model.Fingerprint]any),
}
svc.updateFixedLimit()
@ -51,25 +53,24 @@ func (s *ownedStreamService) getFixedLimit() int {
return int(s.fixedLimit.Load())
}
func (s *ownedStreamService) incOwnedStreamCount() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount++
}
func (s *ownedStreamService) incNotOwnedStreamCount() {
func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) {
s.lock.Lock()
defer s.lock.Unlock()
if owned {
s.ownedStreamCount++
return
}
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc()
s.notOwnedStreamCount++
s.notOwnedStreams[fp] = nil
}
func (s *ownedStreamService) decOwnedStreamCount() {
func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) {
s.lock.Lock()
defer s.lock.Unlock()
if s.notOwnedStreamCount > 0 {
if _, notOwned := s.notOwnedStreams[fp]; notOwned {
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec()
s.notOwnedStreamCount--
delete(s.notOwnedStreams, fp)
return
}
s.ownedStreamCount--
@ -79,6 +80,14 @@ func (s *ownedStreamService) resetStreamCounts() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount = 0
s.notOwnedStreamCount = 0
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0)
s.notOwnedStreams = make(map[model.Fingerprint]any)
}
func (s *ownedStreamService) isStreamNotOwned(fp model.Fingerprint) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, notOwned := s.notOwnedStreams[fp]
return notOwned
}

@ -4,6 +4,7 @@ import (
"sync"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/validation"
@ -28,51 +29,60 @@ func Test_OwnedStreamService(t *testing.T) {
service.updateFixedLimit()
require.Equal(t, 100, service.getFixedLimit())
service.incOwnedStreamCount()
service.incOwnedStreamCount()
service.incOwnedStreamCount()
service.trackStreamOwnership(model.Fingerprint(1), true)
service.trackStreamOwnership(model.Fingerprint(2), true)
service.trackStreamOwnership(model.Fingerprint(3), true)
require.Equal(t, 3, service.getOwnedStreamCount())
require.Len(t, service.notOwnedStreams, 0)
service.incOwnedStreamCount()
service.decOwnedStreamCount()
service.notOwnedStreamCount = 1
service.ownedStreamCount = 2
require.Equal(t, 2, service.getOwnedStreamCount())
require.Equal(t, 1, service.notOwnedStreamCount)
service.resetStreamCounts()
service.trackStreamOwnership(model.Fingerprint(3), true)
service.trackStreamOwnership(model.Fingerprint(3), false)
require.Equal(t, 1, service.getOwnedStreamCount(),
"owned streams count must not be changed because not owned stream can be reported only by recalculate_owned_streams job that resets the counters before checking all the streams")
require.Len(t, service.notOwnedStreams, 1)
require.True(t, service.isStreamNotOwned(model.Fingerprint(3)))
service.resetStreamCounts()
service.trackStreamOwnership(model.Fingerprint(1), true)
service.trackStreamOwnership(model.Fingerprint(2), true)
service.trackStreamOwnership(model.Fingerprint(3), false)
service.decOwnedStreamCount()
require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStreamCount is set to 0")
require.Equal(t, 0, service.notOwnedStreamCount)
service.trackRemovedStream(model.Fingerprint(3))
require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStream does not contain this fingerprint")
require.Len(t, service.notOwnedStreams, 0)
service.decOwnedStreamCount()
service.trackRemovedStream(model.Fingerprint(2))
require.Equal(t, 1, service.getOwnedStreamCount())
require.Equal(t, 0, service.notOwnedStreamCount, "notOwnedStreamCount must not be decremented lower than 0")
require.Len(t, service.notOwnedStreams, 0)
group := sync.WaitGroup{}
group.Add(200)
group.Add(100)
for i := 0; i < 100; i++ {
go func() {
go func(i int) {
defer group.Done()
service.incOwnedStreamCount()
}()
service.trackStreamOwnership(model.Fingerprint(i+1000), true)
}(i)
}
group.Wait()
group.Add(100)
for i := 0; i < 100; i++ {
go func() {
go func(i int) {
defer group.Done()
service.decOwnedStreamCount()
}()
service.trackRemovedStream(model.Fingerprint(i + 1000))
}(i)
}
group.Wait()
require.Equal(t, 1, service.getOwnedStreamCount(), "owned stream count must not be changed")
// simulate the effect from the recalculation job
service.notOwnedStreamCount = 1
service.ownedStreamCount = 2
service.trackStreamOwnership(model.Fingerprint(44), false)
service.trackStreamOwnership(model.Fingerprint(45), true)
service.resetStreamCounts()
require.Equal(t, 0, service.getOwnedStreamCount())
require.Equal(t, 0, service.notOwnedStreamCount)
require.Len(t, service.notOwnedStreams, 0)
}

@ -96,7 +96,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) {
createStream(t, tenant, 250)
require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, 0, tenant.ownedStreamsSvc.notOwnedStreamCount)
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0)
mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}}
@ -110,7 +110,7 @@ func Test_recalculateOwnedStreams_recalculate(t *testing.T) {
require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation")
}
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, testData.expectedNotOwnedStreamCount, tenant.ownedStreamsSvc.notOwnedStreamCount)
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
})
}

Loading…
Cancel
Save