Wire up query path with new parsers and iterators.

pull/1/head
Tom Wilkie 8 years ago
parent 69d9c9ab5d
commit bbf4552cc4
  1. 10
      pkg/ingester/ingester.go
  2. 42
      pkg/ingester/instance.go
  3. 13
      pkg/ingester/stream.go
  4. 1
      pkg/parser/parser_test.go
  5. 4
      pkg/querier/iterator.go
  6. 73
      pkg/querier/querier.go

@ -63,6 +63,16 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
return inst
}
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
instanceID, err := user.ExtractOrgID(queryServer.Context())
if err != nil {
return err
}
instance := i.getOrCreateInstance(instanceID)
return instance.Query(req, queryServer)
}
func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

@ -4,7 +4,15 @@ import (
"context"
"sync"
"github.com/pkg/errors"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/parser"
"github.com/grafana/logish/pkg/querier"
)
var (
ErrStreamMissing = errors.New("Stream missing")
)
type instance struct {
@ -25,11 +33,15 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
defer i.streamsMtx.Unlock()
for _, s := range req.Streams {
//labels.Validate
labels, err := parser.Labels(s.Labels)
if err != nil {
return err
}
stream, ok := i.streams[s.Labels]
if !ok {
stream = newStream()
stream = newStream(labels)
i.index.add(labels, s.Labels)
i.streams[s.Labels] = stream
}
@ -40,3 +52,29 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return nil
}
func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
matchers, err := parser.Matchers(req.Query)
if err != nil {
return err
}
// TODO: lock smell
i.streamsMtx.Lock()
ids := i.index.lookup(matchers)
iterators := make([]querier.EntryIterator, len(ids))
for j := range ids {
stream, ok := i.streams[ids[j]]
if !ok {
i.streamsMtx.Unlock()
return ErrStreamMissing
}
iterators[j] = stream.Iterator()
}
i.streamsMtx.Unlock()
iterator := querier.NewHeapIterator(iterators)
defer iterator.Close()
return querier.SendBatches(iterator, queryServer)
}

@ -3,6 +3,8 @@ package ingester
import (
"context"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/querier"
)
@ -13,10 +15,13 @@ type stream struct {
// Newest chunk at chunks[0].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []Chunk
labels labels.Labels
}
func newStream() *stream {
return &stream{}
func newStream(labels labels.Labels) *stream {
return &stream{
labels: labels,
}
}
func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error {
@ -40,13 +45,13 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error {
return nil
}
func (s *stream) Iterator(labels string) querier.EntryIterator {
func (s *stream) Iterator() querier.EntryIterator {
iterators := make([]querier.EntryIterator, len(s.chunks))
for i, c := range s.chunks {
iterators[i] = c.Iterator()
}
return &nonOverlappingIterator{
labels: labels,
labels: s.labels.String(),
iterators: iterators,
}
}

@ -21,6 +21,7 @@ func TestLex(t *testing.T) {
{`{ foo !~ "bar" }`, []int{MATCHERS, OPEN_BRACE, IDENTIFIER, NRE, STRING, CLOSE_BRACE}},
{`{ foo = "bar", bar != "baz" }`, []int{MATCHERS, OPEN_BRACE, IDENTIFIER, EQ, STRING,
COMMA, IDENTIFIER, NEQ, STRING, CLOSE_BRACE}},
{`{ foo = "ba\"r" }`, []int{MATCHERS, OPEN_BRACE, IDENTIFIER, EQ, STRING, CLOSE_BRACE}},
} {
t.Run(tc.input, func(t *testing.T) {
actual := []int{}

@ -77,7 +77,7 @@ type heapIterator struct {
curr EntryIterator
}
func newHeapIterator(is []EntryIterator) EntryIterator {
func NewHeapIterator(is []EntryIterator) EntryIterator {
result := &heapIterator{
iterators: make(iteratorHeap, 0, len(is)),
}
@ -137,7 +137,7 @@ func queryResponseIterator(resp *logproto.QueryResponse) EntryIterator {
for i := range resp.Streams {
is = append(is, newStreamIterator(resp.Streams[i]))
}
return newHeapIterator(is)
return NewHeapIterator(is)
}
type queryClientIterator struct {

@ -55,40 +55,10 @@ func (q *Querier) Query(req *logproto.QueryRequest, queryServer logproto.Querier
for i := range clients {
iterators[i] = newQueryClientIterator(clients[i].(logproto.Querier_QueryClient))
}
i := newHeapIterator(iterators)
i := NewHeapIterator(iterators)
defer i.Close()
streams := map[string]*logproto.Stream{}
respSize := 0
for i.Next() {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
respSize++
if respSize > queryBatchSize {
queryResp := logproto.QueryResponse{
Streams: make([]*logproto.Stream, len(streams)),
}
for _, stream := range streams {
queryResp.Streams = append(queryResp.Streams, stream)
}
if err := queryServer.Send(&queryResp); err != nil {
return err
}
streams = map[string]*logproto.Stream{}
respSize = 0
}
}
return i.Error()
return SendBatches(i, queryServer)
}
// forAllIngesters runs f, in parallel, for all ingesters
@ -139,3 +109,42 @@ func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, e
func (*Querier) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}
type iteratorBatcher struct {
iterator EntryIterator
queryServer logproto.Querier_QueryServer
}
func SendBatches(i EntryIterator, queryServer logproto.Querier_QueryServer) error {
streams := map[string]*logproto.Stream{}
respSize := 0
for i.Next() {
labels, entry := i.Labels(), i.Entry()
stream, ok := streams[labels]
if !ok {
stream = &logproto.Stream{
Labels: labels,
}
streams[labels] = stream
}
stream.Entries = append(stream.Entries, entry)
respSize++
if respSize > queryBatchSize {
queryResp := logproto.QueryResponse{
Streams: make([]*logproto.Stream, len(streams)),
}
for _, stream := range streams {
queryResp.Streams = append(queryResp.Streams, stream)
}
if err := queryServer.Send(&queryResp); err != nil {
return err
}
streams = map[string]*logproto.Stream{}
respSize = 0
}
}
return i.Error()
}

Loading…
Cancel
Save