From a2c4df0416c845bb1c5eb9b7beba88af074ed301 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 18 May 2018 20:01:38 +0100 Subject: [PATCH] Plumb start & end timestamps through the server. Signed-off-by: Tom Wilkie --- pkg/logproto/logproto.proto | 3 ++ pkg/querier/http.go | 93 ++++++++++++++++++------------------- pkg/querier/querier.go | 42 +++++++++++++++++ 3 files changed, 89 insertions(+), 49 deletions(-) diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index 3178d8aedf..2f9eb9fca1 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -24,6 +24,9 @@ message PushResponse { message QueryRequest { string query = 1; + uint32 limit = 2; + google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; } message QueryResponse { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index e43bd6f1f8..d865869a6d 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -1,84 +1,79 @@ package querier import ( - "context" "encoding/json" "net/http" + "net/url" "strconv" + "time" "github.com/grafana/logish/pkg/logproto" ) const ( defaultQueryLimit = 100 + defaulSince = 1 * time.Hour ) +func intParam(values url.Values, name string, def int) (int, error) { + value := values.Get("limit") + if value == "" { + return def, nil + } + + return strconv.Atoi(value) +} + +func unixTimeParam(values url.Values, name string, def time.Time) (time.Time, error) { + value := values.Get("limit") + if value == "" { + return def, nil + } + + secs, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return time.Time{}, err + } + + return time.Unix(secs, 0), nil +} + func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) { params := r.URL.Query() query := params.Get("query") - limit := defaultQueryLimit - if limitStr := params.Get("limit"); limitStr != "" { - var err error - limit, err = strconv.Atoi(limitStr) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } + limit, err := intParam(params, "limit", defaultQueryLimit) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } - result, err := q.Query(r.Context(), query, limit) + now := time.Now() + start, err := unixTimeParam(params, "start", now.Add(-defaulSince)) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - if err := json.NewEncoder(w).Encode(result); err != nil { + end, err := unixTimeParam(params, "end", now) + if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } -} -func (q *Querier) Query(ctx context.Context, query string, limit int) (*logproto.QueryResponse, error) { - req := &logproto.QueryRequest{ + request := logproto.QueryRequest{ Query: query, + Limit: uint32(limit), + Start: start, + End: end, } - clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) { - return client.Query(ctx, req) - }) + result, err := q.Query(r.Context(), &request) if err != nil { - return nil, err - } - - iterators := make([]EntryIterator, len(clients)) - for i := range clients { - iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient)) - } - iterator := NewHeapIterator(iterators) - defer iterator.Close() - - return ReadBatch(iterator, limit) -} - -func ReadBatch(i EntryIterator, size int) (*logproto.QueryResponse, error) { - streams := map[string]*logproto.Stream{} - - for respSize := 0; respSize < size && i.Next(); respSize++ { - labels, entry := i.Labels(), i.Entry() - stream, ok := streams[labels] - if !ok { - stream = &logproto.Stream{ - Labels: labels, - } - streams[labels] = stream - } - stream.Entries = append(stream.Entries, entry) + http.Error(w, err.Error(), http.StatusBadRequest) + return } - result := logproto.QueryResponse{ - Streams: make([]*logproto.Stream, 0, len(streams)), - } - for _, stream := range streams { - result.Streams = append(result.Streams, stream) + if err := json.NewEncoder(w).Encode(result); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } - return &result, i.Error() } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b86d456f04..c04eb20e01 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -89,6 +89,48 @@ func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, e return result, nil } +func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logproto.QueryResponse, error) { + clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) { + return client.Query(ctx, req) + }) + if err != nil { + return nil, err + } + + iterators := make([]EntryIterator, len(clients)) + for i := range clients { + iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient)) + } + iterator := NewHeapIterator(iterators) + defer iterator.Close() + + return ReadBatch(iterator, req.Limit) +} + +func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, error) { + streams := map[string]*logproto.Stream{} + + for respSize := uint32(0); respSize < size && i.Next(); respSize++ { + labels, entry := i.Labels(), i.Entry() + stream, ok := streams[labels] + if !ok { + stream = &logproto.Stream{ + Labels: labels, + } + streams[labels] = stream + } + stream.Entries = append(stream.Entries, entry) + } + + result := logproto.QueryResponse{ + Streams: make([]*logproto.Stream, 0, len(streams)), + } + for _, stream := range streams { + result.Streams = append(result.Streams, stream) + } + return &result, i.Error() +} + // Check implements the grpc healthcheck func (*Querier) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil