Add HTTP interface to distributor and querier.

pull/2/head
Tom Wilkie 8 years ago
parent eb4e99d928
commit 45807ba19b
  1. 4
      cmd/distributor/main.go
  2. 4
      cmd/querier/main.go
  3. 24
      pkg/distributor/http.go
  4. 21
      pkg/ingester/instance.go
  5. 84
      pkg/querier/http.go
  6. 54
      pkg/querier/querier.go

@ -2,6 +2,7 @@ package main
import (
"flag"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/middleware"
@ -11,7 +12,6 @@ import (
"google.golang.org/grpc"
"github.com/grafana/logish/pkg/distributor"
"github.com/grafana/logish/pkg/logproto"
)
func main() {
@ -45,6 +45,6 @@ func main() {
}
defer server.Shutdown()
logproto.RegisterPusherServer(server.GRPC, distributor)
server.HTTP.Handle("/api/push", http.HandlerFunc(distributor.PushHandler))
server.Run()
}

@ -2,6 +2,7 @@ package main
import (
"flag"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/middleware"
@ -10,7 +11,6 @@ import (
"github.com/weaveworks/cortex/pkg/util"
"google.golang.org/grpc"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/querier"
)
@ -45,6 +45,6 @@ func main() {
}
defer server.Shutdown()
logproto.RegisterQuerierServer(server.GRPC, querier)
server.HTTP.Handle("/api/query", http.HandlerFunc(querier.QueryHandler))
server.Run()
}

@ -0,0 +1,24 @@
package distributor
import (
"net/http"
"github.com/weaveworks/cortex/pkg/util"
"github.com/grafana/logish/pkg/logproto"
)
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
var req logproto.PushRequest
if _, err := util.ParseProtoRequest(r.Context(), r, &req, util.RawSnappy); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_, err := d.Push(r.Context(), &req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}

@ -11,6 +11,8 @@ import (
"github.com/grafana/logish/pkg/querier"
)
const queryBatchSize = 128
var (
ErrStreamMissing = errors.New("Stream missing")
)
@ -76,5 +78,22 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
iterator := querier.NewHeapIterator(iterators)
defer iterator.Close()
return querier.SendBatches(iterator, queryServer)
return sendBatches(iterator, queryServer)
}
func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServer) error {
for {
batch, err := querier.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}
if err := queryServer.Send(batch); err != nil {
return err
}
}
}

@ -0,0 +1,84 @@
package querier
import (
"context"
"encoding/json"
"net/http"
"strconv"
"github.com/grafana/logish/pkg/logproto"
)
const (
defaultQueryLimit = 100
)
func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) {
query := r.FormValue("query")
limitStr := r.FormValue("limit")
limit := defaultQueryLimit
if limitStr != "" {
var err error
limit, err = strconv.Atoi(limitStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
result, err := q.Query(r.Context(), query, limit)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
func (q *Querier) Query(ctx context.Context, query string, limit int) (*logproto.QueryResponse, error) {
req := &logproto.QueryRequest{
Query: query,
}
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(ctx, req)
})
if err != nil {
return nil, err
}
iterators := make([]EntryIterator, len(clients))
for i := range clients {
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient))
}
iterator := NewHeapIterator(iterators)
defer iterator.Close()
return ReadBatch(iterator, limit)
}
func ReadBatch(i EntryIterator, size int) (*logproto.QueryResponse, error) {
streams := map[string]*logproto.Stream{}
for respSize := 0; respSize < size && i.Next(); respSize++ {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
}
result := logproto.QueryResponse{
Streams: make([]*logproto.Stream, len(streams)),
}
for _, stream := range streams {
result.Streams = append(result.Streams, stream)
}
return &result, i.Error()
}

@ -13,8 +13,6 @@ import (
"github.com/grafana/logish/pkg/logproto"
)
const queryBatchSize = 128
type Config struct {
RemoteTimeout time.Duration
ClientConfig client.Config
@ -43,24 +41,6 @@ func New(cfg Config, ring ring.ReadRing) (*Querier, error) {
}, nil
}
func (q *Querier) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(queryServer.Context(), req)
})
if err != nil {
return err
}
iterators := make([]EntryIterator, len(clients))
for i := range clients {
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient))
}
i := NewHeapIterator(iterators)
defer i.Close()
return SendBatches(i, queryServer)
}
// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, error)) ([]interface{}, error) {
@ -114,37 +94,3 @@ type iteratorBatcher struct {
iterator EntryIterator
queryServer logproto.Querier_QueryServer
}
func SendBatches(i EntryIterator, queryServer logproto.Querier_QueryServer) error {
streams := map[string]*logproto.Stream{}
respSize := 0
for i.Next() {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
respSize++
if respSize > queryBatchSize {
queryResp := logproto.QueryResponse{
Streams: make([]*logproto.Stream, len(streams)),
}
for _, stream := range streams {
queryResp.Streams = append(queryResp.Streams, stream)
}
if err := queryServer.Send(&queryResp); err != nil {
return err
}
streams = map[string]*logproto.Stream{}
respSize = 0
}
}
return i.Error()
}

Loading…
Cancel
Save