From 00afed1eb394595f8baced9bb4a04efc0d5d396a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sun, 20 May 2018 11:42:23 +0100 Subject: [PATCH] Add start, end and direction options to querier. Signed-off-by: Tom Wilkie --- cmd/logcli/main.go | 20 +++++++--- pkg/ingester/chunk.go | 51 +++++++++++++++++++++---- pkg/ingester/instance.go | 4 +- pkg/ingester/stream.go | 13 ++++++- pkg/logproto/logproto.proto | 6 +++ pkg/querier/http.go | 35 ++++++++++++++--- pkg/querier/iterator.go | 73 +++++++++++++++++++++++------------- pkg/querier/iterator_test.go | 30 ++++++++++++--- pkg/querier/querier.go | 4 +- 9 files changed, 180 insertions(+), 56 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 8c1a3aeac9..d8d9ca31ec 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -24,8 +24,9 @@ var defaultAddr = "https://log-us.grafana.net/api/prom/query" func main() { var ( - limit = flag.Int("limit", 30, "Limit on number of entries to print") - since = flag.Duration("since", 1*time.Hour, "Lookback window") + limit = flag.Int("limit", 30, "Limit on number of entries to print.") + since = flag.Duration("since", 1*time.Hour, "Lookback window.") + forward = flag.Bool("forward", false, "Scan forwards through logs.") ) flag.Parse() @@ -44,8 +45,13 @@ func main() { start := end.Add(-*since) username := os.Getenv("GRAFANA_USERNAME") password := os.Getenv("GRAFANA_PASSWORD") - url := fmt.Sprintf("%s?query=%s&limit=%d&start=%d&end=%d", - addr, url.QueryEscape(query), *limit, start.Unix(), end.Unix()) + directionStr := "backward" + if *forward { + directionStr = "forward" + } + url := fmt.Sprintf("%s?query=%s&limit=%d&start=%d&end=%d&direction=%s", + addr, url.QueryEscape(query), *limit, start.Unix(), end.Unix(), directionStr) + fmt.Println(url) req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -102,7 +108,11 @@ func main() { } } - iter := querier.NewQueryResponseIterator(&queryResponse) + d := logproto.BACKWARD + if *forward { + d = logproto.FORWARD + } + iter := querier.NewQueryResponseIterator(&queryResponse, d) for iter.Next() { ls := labelsCache[iter.Labels()] ls = subtract(commonLabels, ls) diff --git a/pkg/ingester/chunk.go b/pkg/ingester/chunk.go index b89661f4ab..a31f115323 100644 --- a/pkg/ingester/chunk.go +++ b/pkg/ingester/chunk.go @@ -1,6 +1,10 @@ package ingester import ( + "log" + "sort" + "time" + "github.com/pkg/errors" "github.com/grafana/logish/pkg/logproto" @@ -17,9 +21,10 @@ var ( ) type Chunk interface { + Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool Push(*logproto.Entry) error - Iterator() querier.EntryIterator + Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator Size() int } @@ -31,6 +36,13 @@ type dumbChunk struct { entries []logproto.Entry } +func (c *dumbChunk) Bounds() (time.Time, time.Time) { + if len(c.entries) == 0 { + return time.Time{}, time.Time{} + } + return c.entries[0].Timestamp, c.entries[len(c.entries)-1].Timestamp +} + func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool { return len(c.entries) < tmpNumEntries } @@ -54,22 +66,45 @@ func (c *dumbChunk) Size() int { // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (c *dumbChunk) Iterator() querier.EntryIterator { +func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator { + i := sort.Search(len(c.entries), func(i int) bool { + return from.Before(c.entries[i].Timestamp) + }) + j := sort.Search(len(c.entries), func(j int) bool { + return !through.After(c.entries[j].Timestamp) + }) + log.Println("from", from, "through", through, "i", i, "j", j, len(c.entries), c.entries) + + start := -1 + if direction == logproto.BACKWARD { + start = j - i + } + // Take a copy of the entries to avoid locking return &dumbChunkIterator{ - i: len(c.entries), - entries: c.entries, + direction: direction, + i: start, + entries: c.entries[i:j], } } type dumbChunkIterator struct { - i int - entries []logproto.Entry + direction logproto.Direction + i int + entries []logproto.Entry } func (i *dumbChunkIterator) Next() bool { - i.i-- - return i.i >= 0 + switch i.direction { + case logproto.BACKWARD: + i.i-- + return i.i >= 0 + case logproto.FORWARD: + i.i++ + return i.i < len(i.entries) + default: + panic(i.direction) + } } func (i *dumbChunkIterator) Entry() logproto.Entry { diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index ec55e6f700..89d6178cc4 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -74,11 +74,11 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie i.streamsMtx.Unlock() return ErrStreamMissing } - iterators[j] = stream.Iterator() + iterators[j] = stream.Iterator(req.Start, req.End, req.Direction) } i.streamsMtx.Unlock() - iterator := querier.NewHeapIterator(iterators) + iterator := querier.NewHeapIterator(iterators, req.Direction) defer iterator.Close() return sendBatches(iterator, queryServer, req.Limit) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 0d21050624..23ba1ab3ae 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "time" "github.com/prometheus/prometheus/pkg/labels" @@ -47,11 +48,19 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error { // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (s *stream) Iterator() querier.EntryIterator { +func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator { iterators := make([]querier.EntryIterator, len(s.chunks)) for i, c := range s.chunks { - iterators[i] = c.Iterator() + switch direction { + case logproto.FORWARD: + iterators[len(s.chunks)-i-1] = c.Iterator(from, through, direction) + case logproto.BACKWARD: + iterators[i] = c.Iterator(from, through, direction) + default: + panic(direction) + } } + return &nonOverlappingIterator{ labels: s.labels.String(), iterators: iterators, diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index 2f9eb9fca1..bf8b990217 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -27,6 +27,12 @@ message QueryRequest { uint32 limit = 2; google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + Direction direction = 5; +} + +enum Direction { + FORWARD = 0; + BACKWARD = 1; } message QueryResponse { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index d865869a6d..a1c83dadf0 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -2,9 +2,11 @@ package querier import ( "encoding/json" + "fmt" "net/http" "net/url" "strconv" + "strings" "time" "github.com/grafana/logish/pkg/logproto" @@ -16,7 +18,7 @@ const ( ) func intParam(values url.Values, name string, def int) (int, error) { - value := values.Get("limit") + value := values.Get(name) if value == "" { return def, nil } @@ -25,7 +27,7 @@ func intParam(values url.Values, name string, def int) (int, error) { } func unixTimeParam(values url.Values, name string, def time.Time) (time.Time, error) { - value := values.Get("limit") + value := values.Get(name) if value == "" { return def, nil } @@ -38,6 +40,19 @@ func unixTimeParam(values url.Values, name string, def time.Time) (time.Time, er return time.Unix(secs, 0), nil } +func directionParam(values url.Values, name string, def logproto.Direction) (logproto.Direction, error) { + value := values.Get(name) + if value == "" { + return def, nil + } + + d, ok := logproto.Direction_value[strings.ToUpper(value)] + if !ok { + return logproto.FORWARD, fmt.Errorf("invalid direction '%s'", value) + } + return logproto.Direction(d), nil +} + func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) { params := r.URL.Query() query := params.Get("query") @@ -60,12 +75,20 @@ func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) { return } + direction, err := directionParam(params, "direction", logproto.BACKWARD) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + request := logproto.QueryRequest{ - Query: query, - Limit: uint32(limit), - Start: start, - End: end, + Query: query, + Limit: uint32(limit), + Start: start, + End: end, + Direction: direction, } + fmt.Println(request) result, err := q.Query(r.Context(), &request) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/pkg/querier/iterator.go b/pkg/querier/iterator.go index e0047e538f..e742e35abc 100644 --- a/pkg/querier/iterator.go +++ b/pkg/querier/iterator.go @@ -55,10 +55,7 @@ func (i *streamIterator) Close() error { type iteratorHeap []EntryIterator -func (h iteratorHeap) Len() int { return len(h) } -func (h iteratorHeap) Less(i, j int) bool { - return h[i].Entry().Timestamp.After(h[j].Entry().Timestamp) -} +func (h iteratorHeap) Len() int { return len(h) } func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *iteratorHeap) Push(x interface{}) { @@ -73,28 +70,50 @@ func (h *iteratorHeap) Pop() interface{} { return x } +type iteratorMinHeap struct { + iteratorHeap +} + +func (h iteratorMinHeap) Less(i, j int) bool { + return h.iteratorHeap[i].Entry().Timestamp.Before(h.iteratorHeap[j].Entry().Timestamp) +} + +type iteratorMaxHeap struct { + iteratorHeap +} + +func (h iteratorMaxHeap) Less(i, j int) bool { + return h.iteratorHeap[i].Entry().Timestamp.After(h.iteratorHeap[j].Entry().Timestamp) +} + // heapIterator iterates over a heap of iterators. type heapIterator struct { - iterators iteratorHeap - curr EntryIterator - errs []error + heap heap.Interface + curr EntryIterator + errs []error } -func NewHeapIterator(is []EntryIterator) EntryIterator { - result := &heapIterator{ - iterators: make(iteratorHeap, 0, len(is)), - } - +func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIterator { + result := &heapIterator{} + iterators := make(iteratorHeap, 0, len(is)) // pre-next each iterator, drop empty. for _, i := range is { if i.Next() { - result.iterators = append(result.iterators, i) + iterators = append(iterators, i) } else { result.recordError(i) i.Close() } } + switch direction { + case logproto.BACKWARD: + result.heap = &iteratorMaxHeap{iterators} + case logproto.FORWARD: + result.heap = &iteratorMinHeap{iterators} + default: + panic("bad direction") + } return result } @@ -108,18 +127,18 @@ func (i *heapIterator) recordError(ei EntryIterator) { func (i *heapIterator) Next() bool { if i.curr != nil { if i.curr.Next() { - heap.Push(&i.iterators, i.curr) + heap.Push(i.heap, i.curr) } else { i.recordError(i.curr) i.curr.Close() } } - if len(i.iterators) == 0 { + if i.heap.Len() == 0 { return false } - i.curr = heap.Pop(&i.iterators).(EntryIterator) + i.curr = heap.Pop(i.heap).(EntryIterator) return true } @@ -143,31 +162,33 @@ func (i *heapIterator) Error() error { } func (i *heapIterator) Close() error { - for _, i := range i.iterators { - if err := i.Close(); err != nil { + for i.heap.Len() > 0 { + if err := i.heap.Pop().(EntryIterator).Close(); err != nil { return err } } return nil } -func NewQueryResponseIterator(resp *logproto.QueryResponse) EntryIterator { +func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator { is := make([]EntryIterator, 0, len(resp.Streams)) for i := range resp.Streams { is = append(is, newStreamIterator(resp.Streams[i])) } - return NewHeapIterator(is) + return NewHeapIterator(is, direction) } type queryClientIterator struct { - client logproto.Querier_QueryClient - err error - curr EntryIterator + client logproto.Querier_QueryClient + direction logproto.Direction + err error + curr EntryIterator } -func newQueryClientIterator(client logproto.Querier_QueryClient) EntryIterator { +func newQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator { return &queryClientIterator{ - client: client, + client: client, + direction: direction, } } @@ -181,7 +202,7 @@ func (i *queryClientIterator) Next() bool { return false } - i.curr = NewQueryResponseIterator(batch) + i.curr = NewQueryResponseIterator(batch, i.direction) } return true diff --git a/pkg/querier/iterator_test.go b/pkg/querier/iterator_test.go index 02a0c476f4..ddf2693f71 100644 --- a/pkg/querier/iterator_test.go +++ b/pkg/querier/iterator_test.go @@ -16,10 +16,10 @@ func TestStreamIterator(t *testing.T) { Timestamp: time.Unix(-i, 0), } }) - testIterator(t, iterator, testSize) + testIterator(t, iterator, testSize, logproto.BACKWARD) } -func TestHeapIterator(t *testing.T) { +func TestHeapIteratorBackward(t *testing.T) { iterators := []EntryIterator{} for i := int64(0); i < 4; i++ { iterators = append(iterators, mkStreamIterator(testSize/4, func(j int64) logproto.Entry { @@ -28,7 +28,19 @@ func TestHeapIterator(t *testing.T) { } })) } - testIterator(t, NewHeapIterator(iterators), testSize) + testIterator(t, NewHeapIterator(iterators, logproto.BACKWARD), testSize, logproto.BACKWARD) +} + +func TestHeapIteratorForward(t *testing.T) { + iterators := []EntryIterator{} + for i := int64(0); i < 4; i++ { + iterators = append(iterators, mkStreamIterator(testSize/4, func(j int64) logproto.Entry { + return logproto.Entry{ + Timestamp: time.Unix(j*4+i, 0), + } + })) + } + testIterator(t, NewHeapIterator(iterators, logproto.FORWARD), testSize, logproto.FORWARD) } func mkStreamIterator(numEntries int64, f func(i int64) logproto.Entry) EntryIterator { @@ -41,10 +53,18 @@ func mkStreamIterator(numEntries int64, f func(i int64) logproto.Entry) EntryIte }) } -func testIterator(t *testing.T, iterator EntryIterator, testSize int64) { +func testIterator(t *testing.T, iterator EntryIterator, + testSize int64, direction logproto.Direction) { i := int64(0) for ; i < testSize && iterator.Next(); i++ { - assert.Equal(t, -i, iterator.Entry().Timestamp.Unix()) + switch direction { + case logproto.BACKWARD: + assert.Equal(t, -i, iterator.Entry().Timestamp.Unix()) + case logproto.FORWARD: + assert.Equal(t, i, iterator.Entry().Timestamp.Unix()) + default: + panic(direction) + } } assert.Equal(t, i, int64(testSize)) assert.NoError(t, iterator.Error()) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 132c2e06fd..91409ca59e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -99,9 +99,9 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr iterators := make([]EntryIterator, len(clients)) for i := range clients { - iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient)) + iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient), req.Direction) } - iterator := NewHeapIterator(iterators) + iterator := NewHeapIterator(iterators, req.Direction) defer iterator.Close() resp, _, err := ReadBatch(iterator, req.Limit)