ingester.max-chunk-age (#1558)

* ingester.max-chunk-age

* bumps helm & changelog

* adjusts max-chunk-age test

* bumps wait period to let ci catch up

* updates changelog
pull/1578/head
Owen Diehl 6 years ago committed by Ed Welch
parent bde0389979
commit 5026dfea57
  1. 7
      CHANGELOG.md
  2. 4
      docs/configuration/README.md
  3. 7
      pkg/ingester/flush.go
  4. 53
      pkg/ingester/flush_test.go
  5. 2
      pkg/ingester/ingester.go

@ -1,3 +1,9 @@
## master / unreleased
### Features
* [1558](https://github.com/grafana/loki/pull/1558) **owen-d**: Introduces `ingester.max-chunk-age` which specifies the maximum chunk age before it's cut.
## 1.3.0 (2019-01-16)
### What's New?? ###
@ -156,7 +162,6 @@ Once again we can't thank our community and contributors enough for the signific
#### New Members!
* [1415](https://github.com/grafana/loki/pull/1415) **cyriltovena**: Add Joe as member of the team.
# 1.2.0 (2019-12-09)
One week has passed since the last Loki release, and it's time for a new one!

@ -300,6 +300,10 @@ The `ingester_config` block configures Ingesters.
# The maximum number of errors a stream will report to the user
# when a push fails. 0 to make unlimited.
[max_returned_stream_errors: <int> | default = 10]
# The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this the current chunk will be flushed to the store and a new chunk created.
[max_chunk_age: <duration> | default = 1h]
```
### lifecycler_config

@ -146,7 +146,7 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}
lastChunk := stream.chunks[len(stream.chunks)-1]
if len(stream.chunks) == 1 && time.Since(lastChunk.lastUpdated) < i.cfg.MaxChunkIdle && !immediate {
if len(stream.chunks) == 1 && !immediate && !i.shouldFlushChunk(&lastChunk) {
return
}
@ -246,7 +246,10 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool {
}
if time.Since(chunk.lastUpdated) > i.cfg.MaxChunkIdle {
chunk.closed = true
return true
}
if from, to := chunk.chunk.Bounds(); to.Sub(from) > i.cfg.MaxChunkAge {
return true
}

@ -102,6 +102,59 @@ func TestFlushingCollidingLabels(t *testing.T) {
}
}
func TestFlushMaxAge(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = time.Millisecond * 100
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour
store, ing := newTestStore(t, cfg)
defer store.Stop()
now := time.Unix(0, 0)
firstEntries := []logproto.Entry{
{Timestamp: now.Add(time.Nanosecond), Line: "1"},
{Timestamp: now.Add(time.Minute), Line: "2"},
}
secondEntries := []logproto.Entry{
{Timestamp: now.Add(time.Second * 61), Line: "3"},
}
req := &logproto.PushRequest{Streams: []*logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: firstEntries},
}}
const userID = "testUser"
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ing.Push(ctx, req)
require.NoError(t, err)
time.Sleep(2 * cfg.FlushCheckPeriod)
// ensure chunk is not flushed after flush period elapses
store.checkData(t, map[string][]*logproto.Stream{})
req2 := &logproto.PushRequest{Streams: []*logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: secondEntries},
}}
_, err = ing.Push(ctx, req2)
require.NoError(t, err)
time.Sleep(2 * cfg.FlushCheckPeriod)
// assert stream is now both batches
store.checkData(t, map[string][]*logproto.Stream{
userID: []*logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: append(firstEntries, secondEntries...)},
},
})
}
type testStore struct {
mtx sync.Mutex
// Chunks keyed by userID.

@ -51,6 +51,7 @@ type Config struct {
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
MaxChunkAge time.Duration `yaml:"max_chunk_age"`
// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
SyncPeriod time.Duration `yaml:"sync_period"`
@ -78,6 +79,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 0, "How often to cut chunks to synchronize ingesters.")
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
}
// Ingester builds chunks for incoming log streams.

Loading…
Cancel
Save