Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/ingester/instance.go

399 lines
9.4 KiB

package ingester
import (
"context"
"net/http"
"sync"
"time"
"github.com/pkg/errors"
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
cutil "github.com/cortexproject/cortex/pkg/util"
Adds configurable compression algorithms for chunks (#1411) * Adds L4Z encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds encoding benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds chunk size test Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy v2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove chunkenc Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update lz4 to latest master version. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use temporary buffer in serialise method to avoid allocations when doing string -> byte conversion. It also makes code little more readable. We pool those buffers for reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added gzip -1 for comparison. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Initialize reader and buffered reader lazily. This helps with reader/buffered reader reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Don't keep entries, extracted generateData function (mostly to get more understandable profile) Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Improve test and benchmark to cover all encodings. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds support for a new chunk format with encoding info. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Ingesters now support encoding config. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add support for no compression. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add docs Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove default Gzip for ByteChunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Removes none, snappyv2 and gzip-1 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Move log test lines to testdata and add supported encoding stringer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * got linted Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
6 years ago
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
)
const queryBatchSize = 128
// Errors returned on Query.
var (
ErrStreamMissing = errors.New("Stream missing")
)
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
var (
memoryStreams = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "ingester_memory_streams",
Help: "The total number of streams in memory.",
})
streamsCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
Name: "ingester_streams_created_total",
Help: "The total number of streams created per tenant.",
}, []string{"tenant"})
streamsRemovedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
Name: "ingester_streams_removed_total",
Help: "The total number of streams removed per tenant.",
}, []string{"tenant"})
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
)
type instance struct {
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream // we use 'mapped' fingerprints here.
index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
instanceID string
streamsCreatedTotal prometheus.Counter
streamsRemovedTotal prometheus.Counter
Adds configurable compression algorithms for chunks (#1411) * Adds L4Z encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds encoding benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds chunk size test Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy v2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove chunkenc Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update lz4 to latest master version. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use temporary buffer in serialise method to avoid allocations when doing string -> byte conversion. It also makes code little more readable. We pool those buffers for reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added gzip -1 for comparison. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Initialize reader and buffered reader lazily. This helps with reader/buffered reader reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Don't keep entries, extracted generateData function (mostly to get more understandable profile) Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Improve test and benchmark to cover all encodings. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds support for a new chunk format with encoding info. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Ingesters now support encoding config. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add support for no compression. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add docs Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove default Gzip for ByteChunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Removes none, snappyv2 and gzip-1 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Move log test lines to testdata and add supported encoding stringer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * got linted Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
6 years ago
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
limiter *Limiter
Adds configurable compression algorithms for chunks (#1411) * Adds L4Z encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds encoding benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds chunk size test Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy v2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove chunkenc Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update lz4 to latest master version. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use temporary buffer in serialise method to avoid allocations when doing string -> byte conversion. It also makes code little more readable. We pool those buffers for reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added gzip -1 for comparison. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Initialize reader and buffered reader lazily. This helps with reader/buffered reader reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Don't keep entries, extracted generateData function (mostly to get more understandable profile) Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Improve test and benchmark to cover all encodings. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds support for a new chunk format with encoding info. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Ingesters now support encoding config. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add support for no compression. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add docs Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove default Gzip for ByteChunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Removes none, snappyv2 and gzip-1 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Move log test lines to testdata and add supported encoding stringer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * got linted Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
6 years ago
factory func() chunkenc.Chunk
// sync
syncPeriod time.Duration
syncMinUtil float64
}
func newInstance(instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
8 years ago
instanceID: instanceID,
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
Adds configurable compression algorithms for chunks (#1411) * Adds L4Z encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds encoding benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds chunk size test Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy v2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove chunkenc Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update lz4 to latest master version. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use temporary buffer in serialise method to avoid allocations when doing string -> byte conversion. It also makes code little more readable. We pool those buffers for reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added gzip -1 for comparison. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Initialize reader and buffered reader lazily. This helps with reader/buffered reader reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Don't keep entries, extracted generateData function (mostly to get more understandable profile) Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Improve test and benchmark to cover all encodings. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds support for a new chunk format with encoding info. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Ingesters now support encoding config. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add support for no compression. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add docs Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove default Gzip for ByteChunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Removes none, snappyv2 and gzip-1 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Move log test lines to testdata and add supported encoding stringer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * got linted Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
6 years ago
factory: factory,
tailers: map[uint32]*tailer{},
limiter: limiter,
syncPeriod: syncPeriod,
syncMinUtil: syncMinUtil,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
}
// consumeChunk manually adds a chunk that was received during ingester chunk
// transfer.
func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapter, chunk *logproto.Chunk) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)
stream, ok := i.streams[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
Adds configurable compression algorithms for chunks (#1411) * Adds L4Z encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds encoding benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds chunk size test Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy v2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove chunkenc Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update lz4 to latest master version. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use temporary buffer in serialise method to avoid allocations when doing string -> byte conversion. It also makes code little more readable. We pool those buffers for reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added gzip -1 for comparison. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Initialize reader and buffered reader lazily. This helps with reader/buffered reader reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Don't keep entries, extracted generateData function (mostly to get more understandable profile) Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Improve test and benchmark to cover all encodings. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds support for a new chunk format with encoding info. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Ingesters now support encoding config. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add support for no compression. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add docs Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove default Gzip for ByteChunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Removes none, snappyv2 and gzip-1 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Move log test lines to testdata and add supported encoding stringer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * got linted Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
6 years ago
stream = newStream(fp, sortedLabels, i.factory)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
i.addTailersToNewStream(stream)
}
err := stream.consumeChunk(ctx, chunk)
if err == nil {
memoryChunks.Inc()
}
return err
}
func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
var appendErr error
for _, s := range req.Streams {
labels, err := util.ToClientLabels(s.Labels)
if err != nil {
appendErr = err
continue
}
stream, err := i.getOrCreateStream(labels)
if err != nil {
appendErr = err
continue
}
prevNumChunks := len(stream.chunks)
if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil {
appendErr = err
continue
}
memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
}
return appendErr
}
func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) {
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)
stream, ok := i.streams[fp]
if ok {
return stream, nil
}
err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}
sortedLabels := i.index.Add(labels, fp)
Adds configurable compression algorithms for chunks (#1411) * Adds L4Z encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds encoding benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy encoding. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds chunk size test Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds snappy v2 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve benchmarks Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove chunkenc Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update lz4 to latest master version. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use temporary buffer in serialise method to avoid allocations when doing string -> byte conversion. It also makes code little more readable. We pool those buffers for reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added gzip -1 for comparison. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Initialize reader and buffered reader lazily. This helps with reader/buffered reader reuse. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Don't keep entries, extracted generateData function (mostly to get more understandable profile) Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Improve test and benchmark to cover all encodings. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds support for a new chunk format with encoding info. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Ingesters now support encoding config. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add support for no compression. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add docs Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Remove default Gzip for ByteChunk. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Removes none, snappyv2 and gzip-1 Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Move log test lines to testdata and add supported encoding stringer Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * got linted Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
6 years ago
stream = newStream(fp, sortedLabels, i.factory)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
return stream, nil
}
// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
s := i.streams[fp]
if s == nil {
return nil
}
return s.labels
}
func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
return err
}
filter, err := expr.Filter()
if err != nil {
return err
}
var iters []iter.EntryIterator
err = i.forMatchingStreams(
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter)
if err != nil {
return err
}
iters = append(iters, iter)
return nil
},
)
if err != nil {
return err
}
iter := iter.NewHeapIterator(queryServer.Context(), iters, req.Direction)
defer helpers.LogError("closing iterator", iter.Close)
return sendBatches(iter, queryServer, req.Limit)
}
func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
var labels []string
if req.Values {
values := i.index.LabelValues(req.Name)
labels = make([]string, len(values))
for i := 0; i < len(values); i++ {
labels[i] = values[i]
}
} else {
names := i.index.LabelNames()
labels = make([]string, len(names))
for i := 0; i < len(names); i++ {
labels[i] = names[i]
}
}
return &logproto.LabelResponse{
Values: labels,
}, nil
}
func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
groups, err := loghttp.Match(req.GetGroups())
if err != nil {
return nil, err
}
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
// exit early when this stream was added by an earlier group
key := stream.labels.Hash()
if _, found := dedupedSeries[key]; found {
return nil
}
dedupedSeries[key] = logproto.SeriesIdentifier{
Labels: stream.labels.Map(),
}
return nil
})
if err != nil {
return nil, err
}
}
series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)
}
return &logproto.SeriesResponse{Series: series}, nil
}
// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
// It uses a function in order to enable generic stream acces without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
matchers []*labels.Matcher,
fn func(*stream) error,
) error {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()
filters, matchers := cutil.SplitFiltersAndMatchers(matchers)
ids := i.index.Lookup(matchers)
outer:
for _, streamID := range ids {
stream, ok := i.streams[streamID]
if !ok {
return ErrStreamMissing
}
for _, filter := range filters {
if !filter.Matches(stream.labels.Get(filter.Name)) {
continue outer
}
}
err := fn(stream)
if err != nil {
return err
}
}
return nil
}
func (i *instance) addNewTailer(t *tailer) {
i.streamsMtx.RLock()
for _, stream := range i.streams {
if stream.matchesTailer(t) {
stream.addTailer(t)
}
}
i.streamsMtx.RUnlock()
i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
i.tailers[t.getID()] = t
}
func (i *instance) addTailersToNewStream(stream *stream) {
closedTailers := []uint32{}
i.tailerMtx.RLock()
for _, t := range i.tailers {
if t.isClosed() {
closedTailers = append(closedTailers, t.getID())
continue
}
if stream.matchesTailer(t) {
stream.addTailer(t)
}
}
i.tailerMtx.RUnlock()
if len(closedTailers) != 0 {
i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
for _, closedTailer := range closedTailers {
delete(i.tailers, closedTailer)
}
}
}
func (i *instance) closeTailers() {
i.tailerMtx.Lock()
defer i.tailerMtx.Unlock()
for _, t := range i.tailers {
t.close()
}
}
func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
fluent-bit shared object go plugin (#847) * Import fluent-bit-go-loki plugin sources * Add fluent-bit-go package constraint * Ensure dependencies with dep ``` $ dep ensure ``` * Update .gitignore * Move fluent-bit-go-loki to fluent-bit/fluent-bit-go-loki directory * Add documentation for fluent-bit-go-loki * Fix for lint * Fix for goimports errors * Display fluent-bit-go-loki plugin version * Use more descriptive description for fluent-bit Loki plugin * Remove needless newlines * Rectify fluent-bit-go-loki makefile Remove needless parts and rename meaningless target * Use more descriptive description in README * refactor(fluent-bit): move make targets into root Makefile * Organize imports * Reorder imports with goimports $ goimports -w out_loki.go * Remove needless LICENSE file Because it is the same for Loki repository. * Rely on Loki client retry mechanism * Make descriptions more clearly * Change default unit * BatchWait: msec -> sec * BatchSize: KiB -> Byte * Use logql to parse Loki native labels format * Use logger instead of fmt.Printf * Update documentation to reflect recent changes * Use prometheus version module to print version * Dump error message with logger * Use DYN_GO_FLAGS instead of custom FLAGS for fluent-bit-plugin * Support RemoveKeys parameter * use logql.ParseMatchers() * refactor fluent-bit go plugin * add ci step for docker image * fix lint issues * fix a failing url test * fix a failing url test * add build images for drone ci * Adding labelmap file to complex record to labels Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add helm chart and fix a bug with labelmap * New push path * dep sync * Move fluent-bit folder and use jsonnet for drone * fix build * Adds more documentation and include a basic config in the container Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * remove binaries * Some readme grammar fixes * docs: really nitpicky grammar change in clients README * Add helm documentation * fixes bad config * bump chart * fixes .gitignore
6 years ago
if limit == 0 {
return sendAllBatches(i, queryServer)
}
sent := uint32(0)
for sent < limit && !isDone(queryServer.Context()) {
batch, batchSize, err := iter.ReadBatch(i, helpers.MinUint32(queryBatchSize, limit-sent))
if err != nil {
return err
}
sent += batchSize
if len(batch.Streams) == 0 {
return nil
}
if err := queryServer.Send(batch); err != nil {
return err
}
}
return nil
}
func sendAllBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer) error {
for !isDone(queryServer.Context()) {
batch, _, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}
if err := queryServer.Send(batch); err != nil {
return err
}
}
return nil
}