|
|
|
@ -184,7 +184,7 @@ func TestBlock(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
|
|
|
|
|
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) |
|
|
|
|
require.NoError(t, err) |
|
|
|
@ -212,7 +212,7 @@ func TestBlock(t *testing.T) { |
|
|
|
|
require.NoError(t, it.Close()) |
|
|
|
|
require.Equal(t, len(cases), idx) |
|
|
|
|
|
|
|
|
|
countExtractor = func() log.StreamSampleExtractor { |
|
|
|
|
countExtractor := func() log.StreamSampleExtractor { |
|
|
|
|
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) |
|
|
|
|
if err != nil { |
|
|
|
|
panic(err) |
|
|
|
@ -276,6 +276,7 @@ func TestCorruptChunk(t *testing.T) { |
|
|
|
|
ctx, start, end := context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64) |
|
|
|
|
for i, c := range cases { |
|
|
|
|
chk.blocks = []block{{b: c.data}} |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
it, err := chk.Iterator(ctx, start, end, logproto.FORWARD, noopStreamPipeline) |
|
|
|
|
require.NoError(t, err, "case %d", i) |
|
|
|
|
|
|
|
|
@ -309,6 +310,7 @@ func TestReadFormatV1(t *testing.T) { |
|
|
|
|
t.Fatal(err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal(err) |
|
|
|
@ -340,6 +342,7 @@ func TestRoundtripV2(t *testing.T) { |
|
|
|
|
|
|
|
|
|
assertLines := func(c *MemChunk) { |
|
|
|
|
require.Equal(t, enc, c.Encoding()) |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal(err) |
|
|
|
@ -529,6 +532,7 @@ func TestChunkFilling(t *testing.T) { |
|
|
|
|
|
|
|
|
|
require.Equal(t, int64(lines), i) |
|
|
|
|
|
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
i = 0 |
|
|
|
@ -711,6 +715,7 @@ func TestChunkStats(t *testing.T) { |
|
|
|
|
expectedSize := inserted * (len(entry.Line) + 3*binary.MaxVarintLen64) |
|
|
|
|
statsCtx, ctx := stats.NewContext(context.Background()) |
|
|
|
|
|
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal(err) |
|
|
|
@ -789,6 +794,7 @@ func TestIteratorClose(t *testing.T) { |
|
|
|
|
} { |
|
|
|
|
c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, testBlockSize, testTargetSize) |
|
|
|
|
inserted := fillChunk(c) |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline) |
|
|
|
|
if err != nil { |
|
|
|
|
t.Fatal(err) |
|
|
|
@ -916,6 +922,7 @@ func BenchmarkBackwardIterator(b *testing.B) { |
|
|
|
|
_ = fillChunk(c) |
|
|
|
|
b.ResetTimer() |
|
|
|
|
for n := 0; n < b.N; n++ { |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline) |
|
|
|
|
if err != nil { |
|
|
|
|
panic(err) |
|
|
|
@ -938,6 +945,7 @@ func TestGenerateDataSize(t *testing.T) { |
|
|
|
|
|
|
|
|
|
bytesRead := uint64(0) |
|
|
|
|
for _, c := range chunks { |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
|
|
|
|
|
iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline) |
|
|
|
|
if err != nil { |
|
|
|
@ -977,6 +985,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { |
|
|
|
|
b.ResetTimer() |
|
|
|
|
|
|
|
|
|
for n := 0; n < b.N; n++ { |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
iter := h.Iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline) |
|
|
|
|
|
|
|
|
|
for iter.Next() { |
|
|
|
@ -1061,6 +1070,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) { |
|
|
|
|
tt := tt |
|
|
|
|
c := createChunk() |
|
|
|
|
|
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
// testing headchunk
|
|
|
|
|
it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, noopStreamPipeline) |
|
|
|
|
require.NoError(t, err) |
|
|
|
@ -1091,6 +1101,7 @@ func TestMemchunkLongLine(t *testing.T) { |
|
|
|
|
for i := 1; i <= 10; i++ { |
|
|
|
|
require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) |
|
|
|
|
} |
|
|
|
|
noopStreamPipeline := log.NewNoopPipeline().ForStream(labels.Labels{}) |
|
|
|
|
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
for i := 1; i <= 10; i++ { |
|
|
|
|