Reduce to a single code path through sendBatches regardless of limit (#6216)

* Reduce to a single code path through sendBatches regardless of limit specified

* simplify sendBatches by using -1 to specify no limit

* test fix
pull/6312/head
Mathew Heard 3 years ago committed by GitHub
parent 135f672c91
commit de418bf6ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/ingester/ingester.go
  2. 37
      pkg/ingester/instance.go
  3. 2
      pkg/ingester/instance_test.go

@ -615,7 +615,13 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
return sendBatches(ctx, it, queryServer, req.Limit)
// sendBatches uses -1 to specify no limit.
batchLimit := int32(req.Limit)
if batchLimit == 0 {
batchLimit = -1
}
return sendBatches(ctx, it, queryServer, batchLimit)
}
// QuerySample the ingesters for series from logs matching a set of matchers.

@ -716,38 +716,23 @@ type QuerierQueryServer interface {
Send(res *logproto.QueryResponse) error
}
func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit uint32) error {
func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error {
stats := stats.FromContext(ctx)
if limit == 0 {
// send all batches.
for !isDone(ctx) {
batch, size, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}
stats.AddIngesterBatch(int64(size))
batch.Stats = stats.Ingester()
if err := queryServer.Send(batch); err != nil {
return err
}
stats.Reset()
}
return nil
}
// send until the limit is reached.
sent := uint32(0)
for sent < limit && !isDone(queryServer.Context()) {
batch, batchSize, err := iter.ReadBatch(i, math.MinUint32(queryBatchSize, limit-sent))
for limit != 0 && !isDone(ctx) {
fetchSize := uint32(queryBatchSize)
if limit > 0 {
fetchSize = math.MinUint32(queryBatchSize, uint32(limit))
}
batch, batchSize, err := iter.ReadBatch(i, fetchSize)
if err != nil {
return err
}
sent += batchSize
if limit > 0 {
limit -= int32(batchSize)
}
if len(batch.Streams) == 0 {
return nil

@ -513,7 +513,7 @@ func Test_Iterator(t *testing.T) {
return nil
},
),
uint32(2)),
int32(2)),
)
require.Equal(t, 2, len(res.Streams))
// each entry translated into a unique stream

Loading…
Cancel
Save