From 714e5d357b61faeb4ad4fe47498a023922a2bd45 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sat, 19 May 2018 10:09:53 +0100 Subject: [PATCH] Honor limit in ingesters Signed-off-by: Tom Wilkie --- pkg/ingester/instance.go | 12 ++++++++---- pkg/querier/querier.go | 11 ++++++----- pkg/util/math.go | 8 ++++++++ 3 files changed, 22 insertions(+), 9 deletions(-) create mode 100644 pkg/util/math.go diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index fce3e6a9fc..ec55e6f700 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/logish/pkg/logproto" "github.com/grafana/logish/pkg/parser" "github.com/grafana/logish/pkg/querier" + "github.com/grafana/logish/pkg/util" ) const queryBatchSize = 128 @@ -80,15 +81,17 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie iterator := querier.NewHeapIterator(iterators) defer iterator.Close() - return sendBatches(iterator, queryServer) + return sendBatches(iterator, queryServer, req.Limit) } -func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServer) error { - for { - batch, err := querier.ReadBatch(i, queryBatchSize) +func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error { + sent := uint32(0) + for sent < limit { + batch, batchSize, err := querier.ReadBatch(i, util.MinUint32(queryBatchSize, limit-sent)) if err != nil { return err } + sent += batchSize if len(batch.Streams) == 0 { return nil @@ -98,4 +101,5 @@ func sendBatches(i querier.EntryIterator, queryServer logproto.Querier_QueryServ return err } } + return nil } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index c04eb20e01..132c2e06fd 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -104,13 +104,14 @@ func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logpr iterator := NewHeapIterator(iterators) defer iterator.Close() - return ReadBatch(iterator, req.Limit) + resp, _, err := ReadBatch(iterator, req.Limit) + return resp, err } -func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, error) { +func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) { streams := map[string]*logproto.Stream{} - - for respSize := uint32(0); respSize < size && i.Next(); respSize++ { + respSize := uint32(0) + for ; respSize < size && i.Next(); respSize++ { labels, entry := i.Labels(), i.Entry() stream, ok := streams[labels] if !ok { @@ -128,7 +129,7 @@ func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, error) { for _, stream := range streams { result.Streams = append(result.Streams, stream) } - return &result, i.Error() + return &result, respSize, i.Error() } // Check implements the grpc healthcheck diff --git a/pkg/util/math.go b/pkg/util/math.go new file mode 100644 index 0000000000..cd97cc9286 --- /dev/null +++ b/pkg/util/math.go @@ -0,0 +1,8 @@ +package util + +func MinUint32(a, b uint32) uint32 { + if a < b { + return a + } + return b +}