Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/storage/cache.go

182 lines
3.6 KiB

package storage
import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedIterator struct {
cache []*logproto.Entry
base iter.EntryIterator
labels string
curr int
closeErr error
iterErr error
}
// newCachedIterator creates an iterator that cache iteration result and can be iterated again
// after closing it without re-using the underlaying iterator `it`.
// The cache iterator should be used for entries that belongs to the same stream only.
func newCachedIterator(it iter.EntryIterator, cap int) *cachedIterator {
c := &cachedIterator{
base: it,
cache: make([]*logproto.Entry, 0, cap),
curr: -1,
}
c.load()
return c
}
func (it *cachedIterator) reset() {
it.curr = -1
}
func (it *cachedIterator) load() {
if it.base != nil {
defer func() {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
it.reset()
}()
// set labels using the first entry
if !it.base.Next() {
return
}
it.labels = it.base.Labels()
// add all entries until the base iterator is exhausted
for {
e := it.base.Entry()
it.cache = append(it.cache, &e)
if !it.base.Next() {
break
}
}
}
}
func (it *cachedIterator) Next() bool {
if len(it.cache) == 0 {
it.cache = nil
return false
}
if it.curr+1 >= len(it.cache) {
return false
}
it.curr++
return it.curr < len(it.cache)
}
func (it *cachedIterator) Entry() logproto.Entry {
if len(it.cache) == 0 {
return logproto.Entry{}
}
if it.curr < 0 {
return *it.cache[0]
}
return *it.cache[it.curr]
}
func (it *cachedIterator) Labels() string {
return it.labels
}
func (it *cachedIterator) Error() error { return it.iterErr }
func (it *cachedIterator) Close() error {
it.reset()
return it.closeErr
}
Improve metric queries by computing samples at the edges. (#2293) * First pass breaking the code appart. Wondering how we're going to achieve fast mutation of labels. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. I realize I need hash for deduping lines. going to benchmark somes. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tested some hash and decided which one to use. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Wip Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Starting working on ingester. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Trying to find a better hash function. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More hash testing we have a winner. xxhash it is. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Settle on xxhash Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Better params interfacing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add interface for queryparams for things that exist in both type of params. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add storage sample iterator implementations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing tests and verifying we don't get collions for the hashing method. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing ingesters tests and refactoring utility function/tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing and testing that stats are still well computed. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing more tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More engine tests finished. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sharding evaluator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes more engine tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix error tests in the engine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish fixing all tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a bug where extractor was not passed in correctly. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add notes about upgrade. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Renamed and fix a bug. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add memchunk tests and starting test for sampleIterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Test heap sample iterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working on test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finishing testing all new iterators. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Making sure all store functions are tested. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Benchmark and verify everything is working well. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Make the linter happy. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * use xxhash v2. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix a flaky test because of map. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * go.mod. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Edward Welch <edward.welch@grafana.com>
6 years ago
// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedSampleIterator struct {
cache []logproto.Sample
base iter.SampleIterator
labels string
curr int
closeErr error
iterErr error
}
// newSampleCachedIterator creates an iterator that cache iteration result and can be iterated again
// after closing it without re-using the underlaying iterator `it`.
// The cache iterator should be used for entries that belongs to the same stream only.
func newCachedSampleIterator(it iter.SampleIterator, cap int) *cachedSampleIterator {
c := &cachedSampleIterator{
base: it,
cache: make([]logproto.Sample, 0, cap),
curr: -1,
}
c.load()
return c
}
func (it *cachedSampleIterator) reset() {
it.curr = -1
}
func (it *cachedSampleIterator) load() {
if it.base != nil {
defer func() {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
it.reset()
}()
// set labels using the first entry
if !it.base.Next() {
return
}
it.labels = it.base.Labels()
// add all entries until the base iterator is exhausted
for {
it.cache = append(it.cache, it.base.Sample())
if !it.base.Next() {
break
}
}
}
}
func (it *cachedSampleIterator) Next() bool {
if len(it.cache) == 0 {
it.cache = nil
return false
}
if it.curr+1 >= len(it.cache) {
return false
}
it.curr++
return it.curr < len(it.cache)
}
func (it *cachedSampleIterator) Sample() logproto.Sample {
if len(it.cache) == 0 {
return logproto.Sample{}
}
if it.curr < 0 {
return it.cache[0]
}
return it.cache[it.curr]
}
func (it *cachedSampleIterator) Labels() string {
return it.labels
}
func (it *cachedSampleIterator) Error() error { return it.iterErr }
func (it *cachedSampleIterator) Close() error {
it.reset()
return it.closeErr
}