diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 584a02ebd5..546d04b32f 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -150,7 +150,10 @@ func (q *query) Exec(ctx context.Context) (Result, error) { status := "200" if err != nil { status = "500" - if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) { + if errors.Is(err, ErrParse) || + errors.Is(err, ErrPipeline) || + errors.Is(err, ErrLimit) || + errors.Is(err, context.Canceled) { status = "400" } } diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 31ecbbdfb0..aacbdcf1cd 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -143,6 +143,7 @@ func (it *batchChunkIterator) loop(ctx context.Context) { } select { case <-ctx.Done(): + it.next <- &chunkBatch{err: ctx.Err()} close(it.next) return case it.next <- it.nextBatch(): diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 2422f7dffd..4dd76baeaa 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1493,6 +1493,35 @@ func Test_IsInvalidChunkError(t *testing.T) { } } +func TestBatchCancel(t *testing.T) { + chunk := func(from time.Time) *LazyChunk { + return newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }) + } + chunks := []*LazyChunk{ + chunk(from), chunk(from.Add(10 * time.Millisecond)), chunk(from.Add(30 * time.Millisecond)), + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + it, err := newLogBatchIterator(ctx, NilMetrics, chunks, 1, newMatchers(fooLabels), log.NewNoopPipeline(), logproto.FORWARD, from, time.Now()) + require.NoError(t, err) + defer require.NoError(t, it.Close()) + for it.Next() { + } + require.Equal(t, context.Canceled, it.Error()) +} + var entry logproto.Entry func Benchmark_store_OverlappingChunks(b *testing.B) {