diff --git a/cmd/distributor/main.go b/cmd/distributor/main.go index b858dccb74..839d82623a 100644 --- a/cmd/distributor/main.go +++ b/cmd/distributor/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "net/http" log "github.com/sirupsen/logrus" "github.com/weaveworks/common/middleware" @@ -11,7 +12,6 @@ import ( "google.golang.org/grpc" "github.com/grafana/logish/pkg/distributor" - "github.com/grafana/logish/pkg/logproto" ) func main() { @@ -45,6 +45,6 @@ func main() { } defer server.Shutdown() - logproto.RegisterPusherServer(server.GRPC, distributor) + server.HTTP.Handle("/api/push", http.HandlerFunc(distributor.PushHandler)) server.Run() } diff --git a/cmd/querier/main.go b/cmd/querier/main.go index 467684f0e1..74ffe01e83 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "net/http" log "github.com/sirupsen/logrus" "github.com/weaveworks/common/middleware" @@ -10,7 +11,6 @@ import ( "github.com/weaveworks/cortex/pkg/util" "google.golang.org/grpc" - "github.com/grafana/logish/pkg/logproto" "github.com/grafana/logish/pkg/querier" ) @@ -45,6 +45,6 @@ func main() { } defer server.Shutdown() - logproto.RegisterQuerierServer(server.GRPC, querier) + server.HTTP.Handle("/api/query", http.HandlerFunc(querier.QueryHandler)) server.Run() } diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go new file mode 100644 index 0000000000..a2c76daacd --- /dev/null +++ b/pkg/distributor/http.go @@ -0,0 +1,24 @@ +package distributor + +import ( + "net/http" + + "github.com/weaveworks/cortex/pkg/util" + + "github.com/grafana/logish/pkg/logproto" +) + +func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { + var req logproto.PushRequest + if _, err := util.ParseProtoRequest(r.Context(), r, &req, util.RawSnappy); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + _, err := d.Push(r.Context(), &req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 8f28e6463c..24cc0664cf 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -11,6 +11,8 @@ import ( "github.com/grafana/logish/pkg/querier" ) +const queryBatchSize = 128 + var ( ErrStreamMissing = errors.New("Stream missing") ) @@ -76,5 +78,22 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie iterator := querier.NewHeapIterator(iterators) defer iterator.Close() - return querier.SendBatches(iterator, queryServer) + return sendBatches(iterator, queryServer) +} + +func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServer) error { + for { + batch, err := querier.ReadBatch(i, queryBatchSize) + if err != nil { + return err + } + + if len(batch.Streams) == 0 { + return nil + } + + if err := queryServer.Send(batch); err != nil { + return err + } + } } diff --git a/pkg/querier/http.go b/pkg/querier/http.go new file mode 100644 index 0000000000..46a5ed4ede --- /dev/null +++ b/pkg/querier/http.go @@ -0,0 +1,84 @@ +package querier + +import ( + "context" + "encoding/json" + "net/http" + "strconv" + + "github.com/grafana/logish/pkg/logproto" +) + +const ( + defaultQueryLimit = 100 +) + +func (q *Querier) QueryHandler(w http.ResponseWriter, r *http.Request) { + query := r.FormValue("query") + limitStr := r.FormValue("limit") + limit := defaultQueryLimit + if limitStr != "" { + var err error + limit, err = strconv.Atoi(limitStr) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + + result, err := q.Query(r.Context(), query, limit) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := json.NewEncoder(w).Encode(result); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } +} + +func (q *Querier) Query(ctx context.Context, query string, limit int) (*logproto.QueryResponse, error) { + req := &logproto.QueryRequest{ + Query: query, + } + clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) { + return client.Query(ctx, req) + }) + if err != nil { + return nil, err + } + + iterators := make([]EntryIterator, len(clients)) + for i := range clients { + iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient)) + } + iterator := NewHeapIterator(iterators) + defer iterator.Close() + + return ReadBatch(iterator, limit) +} + +func ReadBatch(i EntryIterator, size int) (*logproto.QueryResponse, error) { + streams := map[string]*logproto.Stream{} + + for respSize := 0; respSize < size && i.Next(); respSize++ { + labels, entry := i.Labels(), i.Entry() + stream, ok := streams[labels] + if !ok { + stream = &logproto.Stream{ + Labels: labels, + } + streams[labels] = stream + } + stream.Entries = append(stream.Entries, entry) + } + + result := logproto.QueryResponse{ + Streams: make([]*logproto.Stream, len(streams)), + } + for _, stream := range streams { + result.Streams = append(result.Streams, stream) + } + return &result, i.Error() +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 7b58b3eb35..da761f3b76 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -13,8 +13,6 @@ import ( "github.com/grafana/logish/pkg/logproto" ) -const queryBatchSize = 128 - type Config struct { RemoteTimeout time.Duration ClientConfig client.Config @@ -43,24 +41,6 @@ func New(cfg Config, ring ring.ReadRing) (*Querier, error) { }, nil } -func (q *Querier) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { - clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) { - return client.Query(queryServer.Context(), req) - }) - if err != nil { - return err - } - - iterators := make([]EntryIterator, len(clients)) - for i := range clients { - iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient)) - } - i := NewHeapIterator(iterators) - defer i.Close() - - return SendBatches(i, queryServer) -} - // forAllIngesters runs f, in parallel, for all ingesters // TODO taken from Cortex, see if we can refactor out an usable interface. func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, error)) ([]interface{}, error) { @@ -114,37 +94,3 @@ type iteratorBatcher struct { iterator EntryIterator queryServer logproto.Querier_QueryServer } - -func SendBatches(i EntryIterator, queryServer logproto.Querier_QueryServer) error { - streams := map[string]*logproto.Stream{} - respSize := 0 - - for i.Next() { - labels, entry := i.Labels(), i.Entry() - stream, ok := streams[labels] - if !ok { - stream = &logproto.Stream{ - Labels: labels, - } - streams[labels] = stream - } - stream.Entries = append(stream.Entries, entry) - respSize++ - - if respSize > queryBatchSize { - queryResp := logproto.QueryResponse{ - Streams: make([]*logproto.Stream, len(streams)), - } - for _, stream := range streams { - queryResp.Streams = append(queryResp.Streams, stream) - } - if err := queryServer.Send(&queryResp); err != nil { - return err - } - streams = map[string]*logproto.Stream{} - respSize = 0 - } - } - - return i.Error() -}