Streaming heap/merge for query path.

pull/1/head
Tom Wilkie 8 years ago
parent 35d1b5e667
commit 4f03ff9140
  1. 185
      pkg/querier/iterator.go
  2. 1
      pkg/querier/iterator_test.go
  3. 78
      pkg/querier/querier.go

@ -0,0 +1,185 @@
package querier
import (
"container/heap"
"io"
"github.com/grafana/logish/pkg/logproto"
)
// EntryIterator iterates over entries in time-order.
type EntryIterator interface {
Next() bool
Entry() logproto.Entry
Labels() string
Error() error
Close() error
}
// streamIterator iterates over entries in a stream.
type streamIterator struct {
i int
entries []logproto.Entry
labels string
}
func newStreamIterator(stream *logproto.Stream) EntryIterator {
return &streamIterator{
i: -1,
entries: stream.Entries,
labels: stream.Labels,
}
}
func (i *streamIterator) Next() bool {
return i.i < len(i.entries)
}
func (i *streamIterator) Error() error {
return nil
}
func (i *streamIterator) Labels() string {
return i.labels
}
func (i *streamIterator) Entry() logproto.Entry {
return i.entries[i.i]
}
func (i *streamIterator) Close() error {
return nil
}
type iteratorHeap []EntryIterator
func (h iteratorHeap) Len() int { return len(h) }
func (h iteratorHeap) Less(i, j int) bool {
return h[i].Entry().Timestamp.Before(h[j].Entry().Timestamp)
}
func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *iteratorHeap) Push(x interface{}) {
*h = append(*h, x.(EntryIterator))
}
func (h *iteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// heapIterator iterates over a heap of iterators.
type heapIterator struct {
iterators iteratorHeap
curr EntryIterator
}
func newHeapIterator(is []EntryIterator) EntryIterator {
result := &heapIterator{
iterators: make(iteratorHeap, 0, len(is)),
}
// pre-next each iterator, drop empty.
for _, i := range is {
if i.Next() {
result.iterators = append(result.iterators, i)
} else {
i.Close()
}
}
return result
}
func (i *heapIterator) Next() bool {
if i.curr != nil {
if i.curr.Next() {
heap.Push(&i.iterators, i.curr)
} else {
i.curr.Close()
}
}
if len(i.iterators) == 0 {
return false
}
i.curr = heap.Pop(&i.iterators).(EntryIterator)
return true
}
func (i *heapIterator) Entry() logproto.Entry {
return i.curr.Entry()
}
func (i *heapIterator) Labels() string {
return i.curr.Labels()
}
func (i *heapIterator) Error() error {
return i.curr.Error()
}
func (i *heapIterator) Close() error {
for _, i := range i.iterators {
if err := i.Close(); err != nil {
return err
}
}
return nil
}
func queryResponseIterator(resp *logproto.QueryResponse) EntryIterator {
is := make([]EntryIterator, len(resp.Streams))
for i := range resp.Streams {
is = append(is, newStreamIterator(resp.Streams[i]))
}
return newHeapIterator(is)
}
type queryClientIterator struct {
client logproto.Querier_QueryClient
err error
curr EntryIterator
}
func newQueryClientIterator(client logproto.Querier_QueryClient) EntryIterator {
return &queryClientIterator{
client: client,
}
}
func (i *queryClientIterator) Next() bool {
for i.curr == nil || !i.curr.Next() {
batch, err := i.client.Recv()
if err == io.EOF {
return false
} else if err != nil {
i.err = err
return false
}
i.curr = queryResponseIterator(batch)
}
return true
}
func (i *queryClientIterator) Entry() logproto.Entry {
return i.curr.Entry()
}
func (i *queryClientIterator) Labels() string {
return i.curr.Labels()
}
func (i *queryClientIterator) Error() error {
return i.err
}
func (i *queryClientIterator) Close() error {
return i.client.CloseSend()
}

@ -0,0 +1 @@
package querier

@ -13,6 +13,8 @@ import (
"github.com/grafana/logish/pkg/logproto"
)
const queryBatchSize = 128
type Config struct {
RemoteTimeout time.Duration
ClientConfig client.Config
@ -42,28 +44,62 @@ func New(cfg Config, ring ring.ReadRing) (*Querier, error) {
}
func (q *Querier) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(queryServer.Context(), req)
})
if err != nil {
return err
}
return q.forAllIngesters(func(client logproto.QuerierClient) error {
_, err := client.Query(queryServer.Context(), req)
if err != nil {
return err
iterators := make([]EntryIterator, len(clients))
for i := range clients {
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient))
}
i := newHeapIterator(iterators)
defer i.Close()
streams := map[string]*logproto.Stream{}
respSize := 0
for i.Next() {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
respSize++
// TODO: a heap to in-order merge and dedupe results
if respSize > queryBatchSize {
queryResp := logproto.QueryResponse{
Streams: make([]*logproto.Stream, len(streams)),
}
for _, stream := range streams {
queryResp.Streams = append(queryResp.Streams, stream)
}
if err := queryServer.Send(&queryResp); err != nil {
return err
}
streams = map[string]*logproto.Stream{}
respSize = 0
}
}
return nil
})
return i.Error()
}
// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) error) error {
func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, error)) ([]interface{}, error) {
replicationSet, err := q.ring.GetAll()
if err != nil {
return err
return nil, err
}
errs := make(chan error)
resps, errs := make(chan interface{}), make(chan error)
for _, ingester := range replicationSet.Ingesters {
go func(ingester *ring.IngesterDesc) {
client, err := q.pool.GetClientFor(ingester.Addr)
@ -72,27 +108,31 @@ func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) error) error {
return
}
errs <- f(client.(logproto.QuerierClient))
resp, err := f(client.(logproto.QuerierClient))
if err != nil {
errs <- err
} else {
resps <- resp
}
}(ingester)
}
var lastErr error
numErrs := 0
result, numErrs := []interface{}{}, 0
for range replicationSet.Ingesters {
select {
case err := <-errs:
if err != nil {
lastErr = err
numErrs++
}
case resp := <-resps:
result = append(result, resp)
case lastErr = <-errs:
numErrs++
}
}
if numErrs > replicationSet.MaxErrors {
return lastErr
return nil, lastErr
}
return nil
return result, nil
}
// Check implements the grpc healthcheck

Loading…
Cancel
Save