Add start, end and direction options to querier.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/8/head
Tom Wilkie 8 years ago
parent 714e5d357b
commit 00afed1eb3
  1. 20
      cmd/logcli/main.go
  2. 51
      pkg/ingester/chunk.go
  3. 4
      pkg/ingester/instance.go
  4. 13
      pkg/ingester/stream.go
  5. 6
      pkg/logproto/logproto.proto
  6. 35
      pkg/querier/http.go
  7. 73
      pkg/querier/iterator.go
  8. 30
      pkg/querier/iterator_test.go
  9. 4
      pkg/querier/querier.go

@ -24,8 +24,9 @@ var defaultAddr = "https://log-us.grafana.net/api/prom/query"
func main() {
var (
limit = flag.Int("limit", 30, "Limit on number of entries to print")
since = flag.Duration("since", 1*time.Hour, "Lookback window")
limit = flag.Int("limit", 30, "Limit on number of entries to print.")
since = flag.Duration("since", 1*time.Hour, "Lookback window.")
forward = flag.Bool("forward", false, "Scan forwards through logs.")
)
flag.Parse()
@ -44,8 +45,13 @@ func main() {
start := end.Add(-*since)
username := os.Getenv("GRAFANA_USERNAME")
password := os.Getenv("GRAFANA_PASSWORD")
url := fmt.Sprintf("%s?query=%s&limit=%d&start=%d&end=%d",
addr, url.QueryEscape(query), *limit, start.Unix(), end.Unix())
directionStr := "backward"
if *forward {
directionStr = "forward"
}
url := fmt.Sprintf("%s?query=%s&limit=%d&start=%d&end=%d&direction=%s",
addr, url.QueryEscape(query), *limit, start.Unix(), end.Unix(), directionStr)
fmt.Println(url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
@ -102,7 +108,11 @@ func main() {
}
}
iter := querier.NewQueryResponseIterator(&queryResponse)
d := logproto.BACKWARD
if *forward {
d = logproto.FORWARD
}
iter := querier.NewQueryResponseIterator(&queryResponse, d)
for iter.Next() {
ls := labelsCache[iter.Labels()]
ls = subtract(commonLabels, ls)

@ -1,6 +1,10 @@
package ingester
import (
"log"
"sort"
"time"
"github.com/pkg/errors"
"github.com/grafana/logish/pkg/logproto"
@ -17,9 +21,10 @@ var (
)
type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Push(*logproto.Entry) error
Iterator() querier.EntryIterator
Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator
Size() int
}
@ -31,6 +36,13 @@ type dumbChunk struct {
entries []logproto.Entry
}
func (c *dumbChunk) Bounds() (time.Time, time.Time) {
if len(c.entries) == 0 {
return time.Time{}, time.Time{}
}
return c.entries[0].Timestamp, c.entries[len(c.entries)-1].Timestamp
}
func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool {
return len(c.entries) < tmpNumEntries
}
@ -54,22 +66,45 @@ func (c *dumbChunk) Size() int {
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator() querier.EntryIterator {
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator {
i := sort.Search(len(c.entries), func(i int) bool {
return from.Before(c.entries[i].Timestamp)
})
j := sort.Search(len(c.entries), func(j int) bool {
return !through.After(c.entries[j].Timestamp)
})
log.Println("from", from, "through", through, "i", i, "j", j, len(c.entries), c.entries)
start := -1
if direction == logproto.BACKWARD {
start = j - i
}
// Take a copy of the entries to avoid locking
return &dumbChunkIterator{
i: len(c.entries),
entries: c.entries,
direction: direction,
i: start,
entries: c.entries[i:j],
}
}
type dumbChunkIterator struct {
i int
entries []logproto.Entry
direction logproto.Direction
i int
entries []logproto.Entry
}
func (i *dumbChunkIterator) Next() bool {
i.i--
return i.i >= 0
switch i.direction {
case logproto.BACKWARD:
i.i--
return i.i >= 0
case logproto.FORWARD:
i.i++
return i.i < len(i.entries)
default:
panic(i.direction)
}
}
func (i *dumbChunkIterator) Entry() logproto.Entry {

@ -74,11 +74,11 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
i.streamsMtx.Unlock()
return ErrStreamMissing
}
iterators[j] = stream.Iterator()
iterators[j] = stream.Iterator(req.Start, req.End, req.Direction)
}
i.streamsMtx.Unlock()
iterator := querier.NewHeapIterator(iterators)
iterator := querier.NewHeapIterator(iterators, req.Direction)
defer iterator.Close()
return sendBatches(iterator, queryServer, req.Limit)

@ -2,6 +2,7 @@ package ingester
import (
"context"
"time"
"github.com/prometheus/prometheus/pkg/labels"
@ -47,11 +48,19 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error {
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (s *stream) Iterator() querier.EntryIterator {
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) querier.EntryIterator {
iterators := make([]querier.EntryIterator, len(s.chunks))
for i, c := range s.chunks {
iterators[i] = c.Iterator()
switch direction {
case logproto.FORWARD:
iterators[len(s.chunks)-i-1] = c.Iterator(from, through, direction)
case logproto.BACKWARD:
iterators[i] = c.Iterator(from, through, direction)
default:
panic(direction)
}
}
return &nonOverlappingIterator{
labels: s.labels.String(),
iterators: iterators,

@ -27,6 +27,12 @@ message QueryRequest {
uint32 limit = 2;
google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
Direction direction = 5;
}
enum Direction {
FORWARD = 0;
BACKWARD = 1;
}
message QueryResponse {

@ -2,9 +2,11 @@ package querier
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/grafana/logish/pkg/logproto"
@ -16,7 +18,7 @@ const (
)
func intParam(values url.Values, name string, def int) (int, error) {
value := values.Get("limit")
value := values.Get(name)
if value == "" {
return def, nil
}
@ -25,7 +27,7 @@ func intParam(values url.Values, name string, def int) (int, error) {
}
func unixTimeParam(values url.Values, name string, def time.Time) (time.Time, error) {
value := values.Get("limit")
value := values.Get(name)
if value == "" {
return def, nil
}
@ -38,6 +40,19 @@ func unixTimeParam(values url.Values, name string, def time.Time) (time.Time, er
return time.Unix(secs, 0), nil
}
func directionParam(values url.Values, name string, def logproto.Direction) (logproto.Direction, error) {
value := values.Get(name)
if value == "" {
return def, nil
}
d, ok := logproto.Direction_value[strings.ToUpper(value)]
if !ok {
return logproto.FORWARD, fmt.Errorf("invalid direction '%s'", value)
}
return logproto.Direction(d), nil
}
func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
query := params.Get("query")
@ -60,12 +75,20 @@ func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
return
}
direction, err := directionParam(params, "direction", logproto.BACKWARD)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
request := logproto.QueryRequest{
Query: query,
Limit: uint32(limit),
Start: start,
End: end,
Query: query,
Limit: uint32(limit),
Start: start,
End: end,
Direction: direction,
}
fmt.Println(request)
result, err := q.Query(r.Context(), &request)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)

@ -55,10 +55,7 @@ func (i *streamIterator) Close() error {
type iteratorHeap []EntryIterator
func (h iteratorHeap) Len() int { return len(h) }
func (h iteratorHeap) Less(i, j int) bool {
return h[i].Entry().Timestamp.After(h[j].Entry().Timestamp)
}
func (h iteratorHeap) Len() int { return len(h) }
func (h iteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *iteratorHeap) Push(x interface{}) {
@ -73,28 +70,50 @@ func (h *iteratorHeap) Pop() interface{} {
return x
}
type iteratorMinHeap struct {
iteratorHeap
}
func (h iteratorMinHeap) Less(i, j int) bool {
return h.iteratorHeap[i].Entry().Timestamp.Before(h.iteratorHeap[j].Entry().Timestamp)
}
type iteratorMaxHeap struct {
iteratorHeap
}
func (h iteratorMaxHeap) Less(i, j int) bool {
return h.iteratorHeap[i].Entry().Timestamp.After(h.iteratorHeap[j].Entry().Timestamp)
}
// heapIterator iterates over a heap of iterators.
type heapIterator struct {
iterators iteratorHeap
curr EntryIterator
errs []error
heap heap.Interface
curr EntryIterator
errs []error
}
func NewHeapIterator(is []EntryIterator) EntryIterator {
result := &heapIterator{
iterators: make(iteratorHeap, 0, len(is)),
}
func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIterator {
result := &heapIterator{}
iterators := make(iteratorHeap, 0, len(is))
// pre-next each iterator, drop empty.
for _, i := range is {
if i.Next() {
result.iterators = append(result.iterators, i)
iterators = append(iterators, i)
} else {
result.recordError(i)
i.Close()
}
}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{iterators}
case logproto.FORWARD:
result.heap = &iteratorMinHeap{iterators}
default:
panic("bad direction")
}
return result
}
@ -108,18 +127,18 @@ func (i *heapIterator) recordError(ei EntryIterator) {
func (i *heapIterator) Next() bool {
if i.curr != nil {
if i.curr.Next() {
heap.Push(&i.iterators, i.curr)
heap.Push(i.heap, i.curr)
} else {
i.recordError(i.curr)
i.curr.Close()
}
}
if len(i.iterators) == 0 {
if i.heap.Len() == 0 {
return false
}
i.curr = heap.Pop(&i.iterators).(EntryIterator)
i.curr = heap.Pop(i.heap).(EntryIterator)
return true
}
@ -143,31 +162,33 @@ func (i *heapIterator) Error() error {
}
func (i *heapIterator) Close() error {
for _, i := range i.iterators {
if err := i.Close(); err != nil {
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
}
}
return nil
}
func NewQueryResponseIterator(resp *logproto.QueryResponse) EntryIterator {
func NewQueryResponseIterator(resp *logproto.QueryResponse, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(resp.Streams))
for i := range resp.Streams {
is = append(is, newStreamIterator(resp.Streams[i]))
}
return NewHeapIterator(is)
return NewHeapIterator(is, direction)
}
type queryClientIterator struct {
client logproto.Querier_QueryClient
err error
curr EntryIterator
client logproto.Querier_QueryClient
direction logproto.Direction
err error
curr EntryIterator
}
func newQueryClientIterator(client logproto.Querier_QueryClient) EntryIterator {
func newQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator {
return &queryClientIterator{
client: client,
client: client,
direction: direction,
}
}
@ -181,7 +202,7 @@ func (i *queryClientIterator) Next() bool {
return false
}
i.curr = NewQueryResponseIterator(batch)
i.curr = NewQueryResponseIterator(batch, i.direction)
}
return true

@ -16,10 +16,10 @@ func TestStreamIterator(t *testing.T) {
Timestamp: time.Unix(-i, 0),
}
})
testIterator(t, iterator, testSize)
testIterator(t, iterator, testSize, logproto.BACKWARD)
}
func TestHeapIterator(t *testing.T) {
func TestHeapIteratorBackward(t *testing.T) {
iterators := []EntryIterator{}
for i := int64(0); i < 4; i++ {
iterators = append(iterators, mkStreamIterator(testSize/4, func(j int64) logproto.Entry {
@ -28,7 +28,19 @@ func TestHeapIterator(t *testing.T) {
}
}))
}
testIterator(t, NewHeapIterator(iterators), testSize)
testIterator(t, NewHeapIterator(iterators, logproto.BACKWARD), testSize, logproto.BACKWARD)
}
func TestHeapIteratorForward(t *testing.T) {
iterators := []EntryIterator{}
for i := int64(0); i < 4; i++ {
iterators = append(iterators, mkStreamIterator(testSize/4, func(j int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(j*4+i, 0),
}
}))
}
testIterator(t, NewHeapIterator(iterators, logproto.FORWARD), testSize, logproto.FORWARD)
}
func mkStreamIterator(numEntries int64, f func(i int64) logproto.Entry) EntryIterator {
@ -41,10 +53,18 @@ func mkStreamIterator(numEntries int64, f func(i int64) logproto.Entry) EntryIte
})
}
func testIterator(t *testing.T, iterator EntryIterator, testSize int64) {
func testIterator(t *testing.T, iterator EntryIterator,
testSize int64, direction logproto.Direction) {
i := int64(0)
for ; i < testSize && iterator.Next(); i++ {
assert.Equal(t, -i, iterator.Entry().Timestamp.Unix())
switch direction {
case logproto.BACKWARD:
assert.Equal(t, -i, iterator.Entry().Timestamp.Unix())
case logproto.FORWARD:
assert.Equal(t, i, iterator.Entry().Timestamp.Unix())
default:
panic(direction)
}
}
assert.Equal(t, i, int64(testSize))
assert.NoError(t, iterator.Error())

@ -99,9 +99,9 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr
iterators := make([]EntryIterator, len(clients))
for i := range clients {
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient))
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient), req.Direction)
}
iterator := NewHeapIterator(iterators)
iterator := NewHeapIterator(iterators, req.Direction)
defer iterator.Close()
resp, _, err := ReadBatch(iterator, req.Limit)

Loading…
Cancel
Save