Plumb start & end timestamps through the server.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/7/head
Tom Wilkie 7 years ago
parent ad2baa5109
commit a2c4df0416
  1. 3
      pkg/logproto/logproto.proto
  2. 93
      pkg/querier/http.go
  3. 42
      pkg/querier/querier.go

@ -24,6 +24,9 @@ message PushResponse {
message QueryRequest {
string query = 1;
uint32 limit = 2;
google.protobuf.Timestamp start = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message QueryResponse {

@ -1,84 +1,79 @@
package querier
import (
"context"
"encoding/json"
"net/http"
"net/url"
"strconv"
"time"
"github.com/grafana/logish/pkg/logproto"
)
const (
defaultQueryLimit = 100
defaulSince = 1 * time.Hour
)
func intParam(values url.Values, name string, def int) (int, error) {
value := values.Get("limit")
if value == "" {
return def, nil
}
return strconv.Atoi(value)
}
func unixTimeParam(values url.Values, name string, def time.Time) (time.Time, error) {
value := values.Get("limit")
if value == "" {
return def, nil
}
secs, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(secs, 0), nil
}
func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
query := params.Get("query")
limit := defaultQueryLimit
if limitStr := params.Get("limit"); limitStr != "" {
var err error
limit, err = strconv.Atoi(limitStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
limit, err := intParam(params, "limit", defaultQueryLimit)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result, err := q.Query(r.Context(), query, limit)
now := time.Now()
start, err := unixTimeParam(params, "start", now.Add(-defaulSince))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode(result); err != nil {
end, err := unixTimeParam(params, "end", now)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
func (q *Querier) Query(ctx context.Context, query string, limit int) (*logproto.QueryResponse, error) {
req := &logproto.QueryRequest{
request := logproto.QueryRequest{
Query: query,
Limit: uint32(limit),
Start: start,
End: end,
}
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(ctx, req)
})
result, err := q.Query(r.Context(), &request)
if err != nil {
return nil, err
}
iterators := make([]EntryIterator, len(clients))
for i := range clients {
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient))
}
iterator := NewHeapIterator(iterators)
defer iterator.Close()
return ReadBatch(iterator, limit)
}
func ReadBatch(i EntryIterator, size int) (*logproto.QueryResponse, error) {
streams := map[string]*logproto.Stream{}
for respSize := 0; respSize < size && i.Next(); respSize++ {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result := logproto.QueryResponse{
Streams: make([]*logproto.Stream, 0, len(streams)),
}
for _, stream := range streams {
result.Streams = append(result.Streams, stream)
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
return &result, i.Error()
}

@ -89,6 +89,48 @@ func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, e
return result, nil
}
func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logproto.QueryResponse, error) {
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(ctx, req)
})
if err != nil {
return nil, err
}
iterators := make([]EntryIterator, len(clients))
for i := range clients {
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient))
}
iterator := NewHeapIterator(iterators)
defer iterator.Close()
return ReadBatch(iterator, req.Limit)
}
func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, error) {
streams := map[string]*logproto.Stream{}
for respSize := uint32(0); respSize < size && i.Next(); respSize++ {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
}
result := logproto.QueryResponse{
Streams: make([]*logproto.Stream, 0, len(streams)),
}
for _, stream := range streams {
result.Streams = append(result.Streams, stream)
}
return &result, i.Error()
}
// Check implements the grpc healthcheck
func (*Querier) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil

Loading…
Cancel
Save