Fix Loki Chunks Dashboard (#1126)

This commit cleans up the Loki Chunks dashboard either by renaming incorrectly 
named series or adding needed metrics.

Added new metrics:

  - loki_ingester_memory_chunks
  - loki_ingester_memory_streams 
  - loki_ingester_chunk_utilization
  - loki_ingester_chunk_compression_ratio

The following metrics were already being published with a different name. 
Fixed in the dashboard:

  - cortex_chunk_store_index_entries_per_chunk
  - cortex_ingester_flush_queue_length
  - loki_ingester_chunk_entries
pull/1132/head
Joe Elliott 7 years ago committed by Robert Fratto
parent 2477f70431
commit b835e2082a
  1. 36
      docs/operations/observability.md
  2. 10
      pkg/chunkenc/dumb_chunk.go
  3. 16
      pkg/chunkenc/facade.go
  4. 34
      pkg/chunkenc/gzip.go
  5. 2
      pkg/chunkenc/interface.go
  6. 31
      pkg/ingester/flush.go
  7. 16
      pkg/ingester/instance.go
  8. 6
      pkg/ingester/stream.go
  9. 8
      production/loki-mixin/dashboards.libsonnet

@ -24,22 +24,26 @@ The Loki Distributors expose the following metrics:
The Loki Ingesters expose the following metrics:
| Metric Name | Metric Type | Description |
| ----------------------------------------- | ----------- | ------------------------------------------------------------------------------------------- |
| `cortex_ingester_flush_queue_length` | Gauge | The total number of series pending in the flush queue. |
| `loki_ingester_chunk_age_seconds` | Histogram | Distribution of chunk ages when flushed. |
| `loki_ingester_chunk_encode_time_seconds` | Histogram | Distribution of chunk encode times. |
| `loki_ingester_chunk_entries` | Histogram | Distribution of entires per-chunk when flushed. |
| `loki_ingester_chunk_size_bytes` | Histogram | Distribution of chunk sizes when flushed. |
| `loki_ingester_chunk_stored_bytes_total` | Counter | Total bytes stored in chunks per tenant. |
| `loki_ingester_chunks_created_total` | Counter | The total number of chunks created in the ingester. |
| `loki_ingester_chunks_flushed_total` | Counter | The total number of chunks flushed by the ingester. |
| `loki_ingester_chunks_stored_total` | Counter | Total stored chunks per tenant. |
| `loki_ingester_received_chunks` | Counter | The total number of chunks sent by this ingester whilst joining during the handoff process. |
| `loki_ingester_samples_per_chunk` | Histogram | The number of samples in a chunk. |
| `loki_ingester_sent_chunks` | Counter | The total number of chunks sent by this ingester whilst leaving during the handoff process. |
| `loki_ingester_streams_created_total` | Counter | The total number of streams created per tenant. |
| `loki_ingester_streams_removed_total` | Counter | The total number of streams removed per tenant. |
| Metric Name | Metric Type | Description |
| -------------------------------------------- | ----------- | ------------------------------------------------------------------------------------------- |
| `cortex_ingester_flush_queue_length` | Gauge | The total number of series pending in the flush queue. |
| `cortex_chunk_store_index_entries_per_chunk` | Histogram | Number of index entries written to storage per chunk. |
| `loki_ingester_memory_chunks` | Gauge | The total number of chunks in memory. |
| `loki_ingester_memory_streams` | Gauge | The total number of streams in memory. |
| `loki_ingester_chunk_age_seconds` | Histogram | Distribution of chunk ages when flushed. |
| `loki_ingester_chunk_encode_time_seconds` | Histogram | Distribution of chunk encode times. |
| `loki_ingester_chunk_entries` | Histogram | Distribution of lines per-chunk when flushed. |
| `loki_ingester_chunk_size_bytes` | Histogram | Distribution of chunk sizes when flushed. |
| `loki_ingester_chunk_utilization` | Histogram | Distribution of chunk utilization (filled uncompressed bytes vs maximum uncompressed bytes) when flushed. |
| `loki_ingester_chunk_compression_ratio` | Histogram | Distribution of chunk compression ratio when flushed. |
| `loki_ingester_chunk_stored_bytes_total` | Counter | Total bytes stored in chunks per tenant. |
| `loki_ingester_chunks_created_total` | Counter | The total number of chunks created in the ingester. |
| `loki_ingester_chunks_stored_total` | Counter | Total stored chunks per tenant. |
| `loki_ingester_received_chunks` | Counter | The total number of chunks sent by this ingester whilst joining during the handoff process. |
| `loki_ingester_samples_per_chunk` | Histogram | The number of samples in a chunk. |
| `loki_ingester_sent_chunks` | Counter | The total number of chunks sent by this ingester whilst leaving during the handoff process. |
| `loki_ingester_streams_created_total` | Counter | The total number of streams created per tenant. |
| `loki_ingester_streams_removed_total` | Counter | The total number of streams removed per tenant. |
Promtail exposes these metrics:

@ -50,6 +50,16 @@ func (c *dumbChunk) Size() int {
return len(c.entries)
}
// UncompressedSize implements Chunk.
func (c *dumbChunk) UncompressedSize() int {
return c.Size()
}
// Utilization implements Chunk
func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {

@ -52,7 +52,23 @@ func (Facade) Encoding() encoding.Encoding {
return GzipLogChunk
}
// Utilization implements encoding.Chunk.
func (f Facade) Utilization() float64 {
return f.c.Utilization()
}
// LokiChunk returns the chunkenc.Chunk.
func (f Facade) LokiChunk() Chunk {
return f.c
}
// UncompressedSize is a helper function to hide the type assertion kludge when wanting the uncompressed size of the Cortex interface encoding.Chunk.
func UncompressedSize(c encoding.Chunk) (int, bool) {
f, ok := c.(*Facade)
if !ok {
return 0, false
}
return f.c.UncompressedSize(), true
}

@ -61,7 +61,8 @@ type block struct {
mint, maxt int64
offset int // The offset of the block in the chunk.
offset int // The offset of the block in the chunk.
uncompressedSize int // Total uncompressed size in bytes when the chunk is cut.
}
// This block holds the un-compressed entries. Once it has enough data, this is
@ -313,6 +314,28 @@ func (c *MemChunk) SpaceFor(*logproto.Entry) bool {
return len(c.blocks) < blocksPerChunk
}
// UncompressedSize implements Chunk.
func (c *MemChunk) UncompressedSize() int {
size := 0
if !c.head.isEmpty() {
size += c.head.size
}
for _, b := range c.blocks {
size += b.uncompressedSize
}
return size
}
// Utilization implements Chunk. It is the bytes used as a percentage of the
func (c *MemChunk) Utilization() float64 {
size := c.UncompressedSize()
return float64(size) / float64(blocksPerChunk*c.blockSize)
}
// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
entryTimestamp := entry.Timestamp.UnixNano()
@ -352,10 +375,11 @@ func (c *MemChunk) cut() error {
}
c.blocks = append(c.blocks, block{
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
uncompressedSize: c.head.size,
})
c.head.entries = c.head.entries[:0]

@ -50,6 +50,8 @@ type Chunk interface {
Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Size() int
Bytes() ([]byte, error)
Utilization() float64
UncompressedSize() int
}
// CompressionWriter is the writer that compresses the data passed to it.

@ -22,9 +22,18 @@ import (
)
var (
chunkUtilization = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
})
memoryChunks = promauto.NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_memory_chunks",
Help: "The total number of chunks in memory.",
})
chunkEntries = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_entries",
Help: "Distribution of stored chunk entries (when stored).",
Help: "Distribution of stored lines per chunk (when stored).",
Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200
})
chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{
@ -32,6 +41,11 @@ var (
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(10000, 2, 7), // biggest bucket is 10000*2^(7-1) = 640000 (~640KB)
})
chunkCompressionRatio = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_compression_ratio",
Help: "Compression ratio of chunks (when stored).",
Buckets: prometheus.LinearBuckets(1, 1.5, 6),
})
chunksPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "loki_ingester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
@ -234,6 +248,7 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool {
func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
now := time.Now()
prevNumChunks := len(stream.chunks)
for len(stream.chunks) > 0 {
if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod {
break
@ -242,11 +257,13 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected
stream.chunks = stream.chunks[1:]
}
memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))
if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.Dec()
}
}
@ -292,9 +309,17 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
continue
}
compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(wc.Data)
if ok {
chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}
chunkUtilization.Observe(wc.Data.Utilization())
chunkEntries.Observe(float64(numEntries))
chunkSize.Observe(float64(len(byt)))
sizePerTenant.Add(float64(len(byt)))
chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()
firstTime, _ := cs[i].chunk.Bounds()
chunkAge.Observe(time.Since(firstTime).Seconds())

@ -32,6 +32,10 @@ var (
)
var (
memoryStreams = promauto.NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_memory_streams",
Help: "The total number of streams in memory.",
})
streamsCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_streams_created_total",
@ -89,10 +93,16 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
i.addTailersToNewStream(stream)
}
return stream.consumeChunk(ctx, chunk)
err := stream.consumeChunk(ctx, chunk)
if err == nil {
memoryChunks.Inc()
}
return err
}
func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
@ -113,10 +123,13 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
continue
}
prevNumChunks := len(stream.chunks)
if err := stream.Push(ctx, s.Entries); err != nil {
appendErr = err
continue
}
memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
}
return appendErr
@ -136,6 +149,7 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)

@ -25,11 +25,6 @@ var (
Name: "ingester_chunks_created_total",
Help: "The total number of chunks created in the ingester.",
})
chunksFlushedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_flushed_total",
Help: "The total number of chunks flushed by the ingester.",
})
samplesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "ingester",
@ -42,7 +37,6 @@ var (
func init() {
prometheus.MustRegister(chunksCreatedTotal)
prometheus.MustRegister(chunksFlushedTotal)
prometheus.MustRegister(samplesPerChunk)
}

@ -92,7 +92,7 @@ local utils = import "mixin-utils/utils.libsonnet";
)
.addPanel(
g.panel('Chunks per series') +
g.queryPanel('sum(loki_ingester_memory_chunks{cluster="$cluster", job="$namespace/ingester"}) / sum(loki_ingester_memory_series{job="$namespace/ingester"})', 'chunks'),
g.queryPanel('sum(loki_ingester_memory_chunks{cluster="$cluster", job="$namespace/ingester"}) / sum(loki_ingester_memory_streams{job="$namespace/ingester"})', 'chunks'),
)
)
.addRow(
@ -111,19 +111,19 @@ local utils = import "mixin-utils/utils.libsonnet";
g.row('Flush Stats')
.addPanel(
g.panel('Size') +
g.latencyPanel('loki_ingester_chunk_length', '{cluster="$cluster", job="$namespace/ingester"}', multiplier='1') +
g.latencyPanel('loki_ingester_chunk_entries', '{cluster="$cluster", job="$namespace/ingester"}', multiplier='1') +
{ yaxes: g.yaxes('short') },
)
.addPanel(
g.panel('Entries') +
g.queryPanel('sum(rate(loki_chunk_store_index_entries_per_chunk_sum{cluster="$cluster", job="$namespace/ingester"}[5m])) / sum(rate(loki_chunk_store_index_entries_per_chunk_count{cluster="$cluster", job="$namespace/ingester"}[5m]))', 'entries'),
g.queryPanel('sum(rate(cortex_chunk_store_index_entries_per_chunk_sum{cluster="$cluster", job="$namespace/ingester"}[5m])) / sum(rate(cortex_chunk_store_index_entries_per_chunk_count{cluster="$cluster", job="$namespace/ingester"}[5m]))', 'entries'),
),
)
.addRow(
g.row('Flush Stats')
.addPanel(
g.panel('Queue Length') +
g.queryPanel('loki_ingester_flush_queue_length{cluster="$cluster", job="$namespace/ingester"}', '{{instance}}'),
g.queryPanel('cortex_ingester_flush_queue_length{cluster="$cluster", job="$namespace/ingester"}', '{{instance}}'),
)
.addPanel(
g.panel('Flush Rate') +

Loading…
Cancel
Save