Add iterator for chunks and streams.

pull/1/head
Tom Wilkie 8 years ago
parent d7e1813ff3
commit 847ea21230
  1. 2
      .gitignore
  2. 39
      pkg/ingester/chunk.go
  3. 48
      pkg/ingester/stream.go

2
.gitignore vendored

@ -1,6 +1,8 @@
.uptodate
*.pb.go
!vendor/**/*.pb.go
vendor/github.com/weaveworks/cortex/pkg/ingester/client/cortex.pb.go
vendor/github.com/weaveworks/cortex/pkg/ring/ring.pb.go
.pkg
.cache
cmd/distributor/distributor

@ -2,6 +2,7 @@ package ingester
import (
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/querier"
"github.com/pkg/errors"
)
@ -17,6 +18,7 @@ var (
type Chunk interface {
SpaceFor(*logproto.Entry) bool
Push(*logproto.Entry) error
Iterator() querier.EntryIterator
}
func newChunk() Chunk {
@ -24,7 +26,7 @@ func newChunk() Chunk {
}
type dumbChunk struct {
entries []*logproto.Entry
entries []logproto.Entry
}
func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool {
@ -40,6 +42,39 @@ func (c *dumbChunk) Push(entry *logproto.Entry) error {
return ErrOutOfOrder
}
c.entries = append(c.entries, entry)
c.entries = append(c.entries, *entry)
return nil
}
func (c *dumbChunk) Iterator() querier.EntryIterator {
// Take a copy of the entries to avoid locking
return &dumbChunkIterator{
i: -1,
entries: c.entries,
}
}
type dumbChunkIterator struct {
i int
entries []logproto.Entry
}
func (i *dumbChunkIterator) Next() bool {
return i.i < len(i.entries)
}
func (i *dumbChunkIterator) Entry() logproto.Entry {
return i.entries[i.i]
}
func (i *dumbChunkIterator) Labels() string {
return ""
}
func (i *dumbChunkIterator) Error() error {
return nil
}
func (i *dumbChunkIterator) Close() error {
return nil
}

@ -4,6 +4,7 @@ import (
"context"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/querier"
)
const tmpMaxChunks = 3
@ -38,3 +39,50 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error {
}
return nil
}
func (s *stream) Iterator(labels string) querier.EntryIterator {
iterators := make([]querier.EntryIterator, len(s.chunks))
for i, c := range s.chunks {
iterators[i] = c.Iterator()
}
return &nonOverlappingIterator{
labels: labels,
iterators: iterators,
}
}
type nonOverlappingIterator struct {
labels string
i int
iterators []querier.EntryIterator
curr querier.EntryIterator
}
func (i *nonOverlappingIterator) Next() bool {
for i.curr == nil || !i.curr.Next() {
if i.i >= len(i.iterators) {
return false
}
i.curr = i.iterators[i.i]
i.i++
}
return true
}
func (i *nonOverlappingIterator) Entry() logproto.Entry {
return i.curr.Entry()
}
func (i *nonOverlappingIterator) Labels() string {
return i.labels
}
func (i *nonOverlappingIterator) Error() error {
return nil
}
func (i *nonOverlappingIterator) Close() error {
return nil
}

Loading…
Cancel
Save