|
|
|
|
@ -1,6 +1,7 @@ |
|
|
|
|
package planner |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
@ -20,6 +21,8 @@ import ( |
|
|
|
|
"google.golang.org/grpc" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/v3/pkg/bloombuild/protos" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/chunkenc" |
|
|
|
|
iter "github.com/grafana/loki/v3/pkg/iter/v2" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/storage" |
|
|
|
|
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/storage/chunk/cache" |
|
|
|
|
@ -166,11 +169,36 @@ func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func genBlock(ref bloomshipper.BlockRef) bloomshipper.Block { |
|
|
|
|
func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { |
|
|
|
|
indexBuf := bytes.NewBuffer(nil) |
|
|
|
|
bloomsBuf := bytes.NewBuffer(nil) |
|
|
|
|
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) |
|
|
|
|
reader := v1.NewByteReader(indexBuf, bloomsBuf) |
|
|
|
|
|
|
|
|
|
blockOpts := v1.NewBlockOptions(chunkenc.EncNone, 4, 1, 0, 0) |
|
|
|
|
|
|
|
|
|
builder, err := v1.NewBlockBuilder(blockOpts, writer) |
|
|
|
|
if err != nil { |
|
|
|
|
return bloomshipper.Block{}, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if _, err = builder.BuildFrom(iter.NewEmptyIter[v1.SeriesWithBlooms]()); err != nil { |
|
|
|
|
return bloomshipper.Block{}, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
block := v1.NewBlock(reader, v1.NewMetrics(nil)) |
|
|
|
|
|
|
|
|
|
buf := bytes.NewBuffer(nil) |
|
|
|
|
if err := v1.TarGz(buf, block.Reader()); err != nil { |
|
|
|
|
return bloomshipper.Block{}, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tarReader := bytes.NewReader(buf.Bytes()) |
|
|
|
|
|
|
|
|
|
return bloomshipper.Block{ |
|
|
|
|
BlockRef: ref, |
|
|
|
|
Data: &DummyReadSeekCloser{}, |
|
|
|
|
} |
|
|
|
|
Data: bloomshipper.ClosableReadSeekerAdapter{ReadSeeker: tarReader}, |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func Test_blockPlansForGaps(t *testing.T) { |
|
|
|
|
@ -612,7 +640,12 @@ func putMetas(bloomClient bloomshipper.Client, metas []bloomshipper.Meta) error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, block := range meta.Blocks { |
|
|
|
|
err := bloomClient.PutBlock(context.Background(), genBlock(block)) |
|
|
|
|
writtenBlock, err := genBlock(block) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err = bloomClient.PutBlock(context.Background(), writtenBlock) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
@ -826,6 +859,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { |
|
|
|
|
for _, tc := range []struct { |
|
|
|
|
name string |
|
|
|
|
originalMetas []bloomshipper.Meta |
|
|
|
|
newMetas []bloomshipper.Meta |
|
|
|
|
expectedUpToDateMetas []bloomshipper.Meta |
|
|
|
|
}{ |
|
|
|
|
{ |
|
|
|
|
@ -835,6 +869,8 @@ func Test_deleteOutdatedMetas(t *testing.T) { |
|
|
|
|
name: "only up to date metas", |
|
|
|
|
originalMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), |
|
|
|
|
}, |
|
|
|
|
newMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}), |
|
|
|
|
}, |
|
|
|
|
expectedUpToDateMetas: []bloomshipper.Meta{ |
|
|
|
|
@ -846,13 +882,52 @@ func Test_deleteOutdatedMetas(t *testing.T) { |
|
|
|
|
name: "outdated metas", |
|
|
|
|
originalMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(0, 5, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 5)}), |
|
|
|
|
genMeta(6, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 10)}), |
|
|
|
|
}, |
|
|
|
|
newMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), |
|
|
|
|
}, |
|
|
|
|
expectedUpToDateMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}), |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
name: "new metas reuse blocks from outdated meta", |
|
|
|
|
originalMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{ // Outdated
|
|
|
|
|
genBlockRef(0, 5), // Reuse
|
|
|
|
|
genBlockRef(5, 10), // Delete
|
|
|
|
|
}), |
|
|
|
|
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{ // Outdated
|
|
|
|
|
genBlockRef(10, 20), // Reuse
|
|
|
|
|
}), |
|
|
|
|
genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ // Up to date
|
|
|
|
|
genBlockRef(20, 30), |
|
|
|
|
}), |
|
|
|
|
}, |
|
|
|
|
newMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ |
|
|
|
|
genBlockRef(0, 5), // Reused block
|
|
|
|
|
}), |
|
|
|
|
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ |
|
|
|
|
genBlockRef(5, 7), // New block
|
|
|
|
|
genBlockRef(7, 10), // New block
|
|
|
|
|
genBlockRef(10, 20), // Reused block
|
|
|
|
|
}), |
|
|
|
|
}, |
|
|
|
|
expectedUpToDateMetas: []bloomshipper.Meta{ |
|
|
|
|
genMeta(0, 5, []int{1}, []bloomshipper.BlockRef{ |
|
|
|
|
genBlockRef(0, 5), |
|
|
|
|
}), |
|
|
|
|
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{ |
|
|
|
|
genBlockRef(5, 7), |
|
|
|
|
genBlockRef(7, 10), |
|
|
|
|
genBlockRef(10, 20), |
|
|
|
|
}), |
|
|
|
|
genMeta(20, 30, []int{0}, []bloomshipper.BlockRef{ |
|
|
|
|
genBlockRef(20, 30), |
|
|
|
|
}), |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
} { |
|
|
|
|
t.Run(tc.name, func(t *testing.T) { |
|
|
|
|
logger := log.NewNopLogger() |
|
|
|
|
@ -867,9 +942,11 @@ func Test_deleteOutdatedMetas(t *testing.T) { |
|
|
|
|
bloomClient, err := planner.bloomStore.Client(testDay.ModelTime()) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Create original metas and blocks
|
|
|
|
|
// Create original/new metas and blocks
|
|
|
|
|
err = putMetas(bloomClient, tc.originalMetas) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
err = putMetas(bloomClient, tc.newMetas) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
// Get all metas
|
|
|
|
|
metas, err := planner.bloomStore.FetchMetas( |
|
|
|
|
@ -882,9 +959,9 @@ func Test_deleteOutdatedMetas(t *testing.T) { |
|
|
|
|
) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
removeLocFromMetasSources(metas) |
|
|
|
|
require.ElementsMatch(t, tc.originalMetas, metas) |
|
|
|
|
require.ElementsMatch(t, append(tc.originalMetas, tc.newMetas...), metas) |
|
|
|
|
|
|
|
|
|
upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.originalMetas, phasePlanning) |
|
|
|
|
upToDate, err := planner.deleteOutdatedMetasAndBlocks(context.Background(), testTable, "fakeTenant", tc.newMetas, tc.originalMetas, phasePlanning) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.ElementsMatch(t, tc.expectedUpToDateMetas, upToDate) |
|
|
|
|
|
|
|
|
|
@ -900,6 +977,13 @@ func Test_deleteOutdatedMetas(t *testing.T) { |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
removeLocFromMetasSources(metas) |
|
|
|
|
require.ElementsMatch(t, tc.expectedUpToDateMetas, metas) |
|
|
|
|
|
|
|
|
|
// Fetch all blocks from the metas
|
|
|
|
|
for _, meta := range metas { |
|
|
|
|
blocks, err := planner.bloomStore.FetchBlocks(context.Background(), meta.Blocks) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.Len(t, blocks, len(meta.Blocks)) |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|