chore: Clarify compression package (#14257)

This PR renames "encoding" to "codec" in the compression package to remove the cognitive dissonance.

It also removes the `Enc` prefix for codec identifiers, so that they adhere Go's best practice of naming conventions, e.g. `compression.EncGZIP` becomes `compression.GZIP` when used in a different package.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/14308/head
Christian Haudum 2 years ago committed by GitHub
parent 84788ad2ae
commit f52f8ad540
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      pkg/bloombuild/builder/builder.go
  2. 2
      pkg/bloombuild/builder/spec_test.go
  3. 2
      pkg/bloombuild/common/tsdb.go
  4. 4
      pkg/bloombuild/planner/planner_test.go
  5. 2
      pkg/chunkenc/dumb_chunk.go
  6. 2
      pkg/chunkenc/interface.go
  7. 14
      pkg/chunkenc/memchunk.go
  8. 56
      pkg/chunkenc/memchunk_test.go
  9. 16
      pkg/chunkenc/unordered_test.go
  10. 2
      pkg/chunkenc/util_test.go
  11. 2
      pkg/compactor/deletion/delete_requests_table.go
  12. 2
      pkg/compactor/index_set.go
  13. 2
      pkg/compactor/retention/retention_test.go
  14. 85
      pkg/compression/codec.go
  15. 6
      pkg/compression/codec_test.go
  16. 83
      pkg/compression/encoding.go
  17. 30
      pkg/compression/fileext.go
  18. 24
      pkg/compression/pool.go
  19. 2
      pkg/compression/pool_test.go
  20. 2
      pkg/ingester/checkpoint_test.go
  21. 2
      pkg/ingester/chunk_test.go
  22. 4
      pkg/ingester/encoding_test.go
  23. 2
      pkg/ingester/flush_test.go
  24. 28
      pkg/ingester/ingester.go
  25. 18
      pkg/ingester/ingester_test.go
  26. 2
      pkg/ingester/stream_test.go
  27. 4
      pkg/storage/bloom/v1/archive.go
  28. 24
      pkg/storage/bloom/v1/archive_test.go
  29. 2
      pkg/storage/bloom/v1/bloom.go
  30. 8
      pkg/storage/bloom/v1/bloom_tokenizer_test.go
  31. 2
      pkg/storage/bloom/v1/builder.go
  32. 22
      pkg/storage/bloom/v1/builder_test.go
  33. 10
      pkg/storage/bloom/v1/fuse_test.go
  34. 8
      pkg/storage/bloom/v1/schema.go
  35. 2
      pkg/storage/bloom/v1/test_util.go
  36. 4
      pkg/storage/bloom/v1/versioned_builder_test.go
  37. 2
      pkg/storage/chunk/cache/cache_test.go
  38. 2
      pkg/storage/chunk/client/grpc/grpc_client_test.go
  39. 2
      pkg/storage/chunk/client/testutils/testutils.go
  40. 2
      pkg/storage/chunk/fetcher/fetcher_test.go
  41. 4
      pkg/storage/hack/main.go
  42. 2
      pkg/storage/store_test.go
  43. 2
      pkg/storage/stores/series/series_store_test.go
  44. 2
      pkg/storage/stores/series_store_write_test.go
  45. 12
      pkg/storage/stores/shipper/bloomshipper/client.go
  46. 36
      pkg/storage/stores/shipper/bloomshipper/client_test.go
  47. 6
      pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
  48. 10
      pkg/storage/stores/shipper/bloomshipper/resolver.go
  49. 26
      pkg/storage/stores/shipper/bloomshipper/resolver_test.go
  50. 4
      pkg/storage/stores/shipper/bloomshipper/store_test.go
  51. 2
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go
  52. 2
      pkg/storage/stores/shipper/indexshipper/uploads/index_set.go
  53. 2
      pkg/storage/util_test.go
  54. 2
      pkg/validation/limits.go
  55. 2
      pkg/validation/limits_test.go
  56. 2
      tools/tsdb/migrate-versions/main.go

@ -34,7 +34,7 @@ import (
)
// TODO(chaudum): Make configurable via (per-tenant?) setting.
var blockCompressionAlgo = compression.EncNone
var defaultBlockCompressionCodec = compression.None
type Builder struct {
services.Service
@ -336,7 +336,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to get client: %w", err)
}
blockEnc, err := compression.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
blockEnc, err := compression.ParseCodec(b.limits.BloomBlockEncoding(task.Tenant))
if err != nil {
return nil, fmt.Errorf("failed to parse block encoding: %w", err)
}
@ -407,7 +407,7 @@ func (b *Builder) processTask(
blockCt++
blk := newBlocks.At()
built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk)
built, err := bloomshipper.BlockFrom(defaultBlockCompressionCodec, tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {

@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser
func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
for _, enc := range []compression.Encoding{compression.EncNone, compression.EncGZIP, compression.EncSnappy} {
for _, enc := range []compression.Codec{compression.None, compression.GZIP, compression.Snappy} {
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions

@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB(
}
defer data.Close()
decompressorPool := compression.GetReaderPool(compression.EncGZIP)
decompressorPool := compression.GetReaderPool(compression.GZIP)
decompressor, err := decompressorPool.GetReader(data)
if err != nil {
return nil, errors.Wrap(err, "failed to get decompressor")

@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)
blockOpts := v1.NewBlockOptions(compression.EncNone, 0, 0)
blockOpts := v1.NewBlockOptions(compression.None, 0, 0)
builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
block := v1.NewBlock(reader, v1.NewMetrics(nil))
buf := bytes.NewBuffer(nil)
if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil {
if err := v1.TarCompress(ref.Codec, buf, block.Reader()); err != nil {
return bloomshipper.Block{}, err
}

@ -70,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}
func (c *dumbChunk) Encoding() compression.Encoding { return compression.EncNone }
func (c *dumbChunk) Encoding() compression.Codec { return compression.None }
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).

@ -68,7 +68,7 @@ type Chunk interface {
UncompressedSize() int
CompressedSize() int
Close() error
Encoding() compression.Encoding
Encoding() compression.Codec
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}

@ -132,7 +132,7 @@ type MemChunk struct {
head HeadBlock
format byte
encoding compression.Encoding
encoding compression.Codec
headFmt HeadBlockFmt
// compressed size of chunk. Set when chunk is cut or while decoding chunk from storage.
@ -355,7 +355,7 @@ type entry struct {
}
// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(chunkFormat byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func NewMemChunk(chunkFormat byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize)
}
@ -370,7 +370,7 @@ func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
}
// NewMemChunk returns a new in-mem chunk.
func newMemChunkWithFormat(format byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func newMemChunkWithFormat(format byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
panicIfInvalidFormat(format, head)
symbolizer := newSymbolizer()
@ -414,10 +414,10 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
bc.format = version
switch version {
case ChunkFormatV1:
bc.encoding = compression.EncGZIP
bc.encoding = compression.GZIP
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
enc := compression.Encoding(db.byte())
enc := compression.Codec(db.byte())
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying encoding")
}
@ -777,7 +777,7 @@ func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt
}
// Encoding implements Chunk.
func (c *MemChunk) Encoding() compression.Encoding {
func (c *MemChunk) Encoding() compression.Codec {
return c.encoding
}
@ -1173,7 +1173,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the
// chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former.
type encBlock struct {
enc compression.Encoding
enc compression.Codec
format byte
symbolizer *symbolizer
block

@ -32,16 +32,16 @@ import (
"github.com/grafana/loki/v3/pkg/util/filter"
)
var testEncodings = []compression.Encoding{
compression.EncNone,
compression.EncGZIP,
compression.EncLZ4_64k,
compression.EncLZ4_256k,
compression.EncLZ4_1M,
compression.EncLZ4_4M,
compression.EncSnappy,
compression.EncFlate,
compression.EncZstd,
var testEncodings = []compression.Codec{
compression.None,
compression.GZIP,
compression.LZ4_64k,
compression.LZ4_256k,
compression.LZ4_1M,
compression.LZ4_4M,
compression.Snappy,
compression.Flate,
compression.Zstd,
}
var (
@ -299,7 +299,7 @@ func TestCorruptChunk(t *testing.T) {
func TestReadFormatV1(t *testing.T) {
t.Parallel()
c := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
// overrides to v1 for testing that specific version.
c.format = ChunkFormatV1
@ -391,7 +391,7 @@ func TestRoundtripV2(t *testing.T) {
}
}
func testNameWithFormats(enc compression.Encoding, chunkFormat byte, headBlockFmt HeadBlockFmt) string {
func testNameWithFormats(enc compression.Codec, chunkFormat byte, headBlockFmt HeadBlockFmt) string {
return fmt.Sprintf("encoding:%v chunkFormat:%v headBlockFmt:%v", enc, chunkFormat, headBlockFmt)
}
@ -558,7 +558,7 @@ func TestChunkFilling(t *testing.T) {
func TestGZIPChunkTargetSize(t *testing.T) {
t.Parallel()
chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
lineSize := 512
entry := &logproto.Entry{
@ -681,7 +681,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()
tester(t, NewMemChunk(ChunkFormatV3, compression.EncGZIP, f, testBlockSize, testTargetSize))
tester(t, NewMemChunk(ChunkFormatV3, compression.GZIP, f, testBlockSize, testTargetSize))
})
}
}
@ -726,7 +726,7 @@ func TestChunkSize(t *testing.T) {
}
func TestChunkStats(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, testBlockSize, 0)
first := time.Now()
entry := &logproto.Entry{
Timestamp: first,
@ -968,7 +968,7 @@ func BenchmarkBackwardIterator(b *testing.B) {
for _, bs := range testBlockSizes {
b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, DefaultTestHeadBlockFmt, bs, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
@ -1082,7 +1082,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
func TestMemChunk_IteratorBounds(t *testing.T) {
createChunk := func() *MemChunk {
t.Helper()
c := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6)
c := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, 1e6, 1e6)
if _, err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
@ -1168,9 +1168,9 @@ func TestMemchunkLongLine(t *testing.T) {
func TestBytesWith(t *testing.T) {
t.Parallel()
exp, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
exp, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
require.Nil(t, err)
out, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
out, err := NewMemChunk(ChunkFormatV3, compression.None, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
require.Nil(t, err)
require.Equal(t, exp, out)
@ -1181,8 +1181,8 @@ func TestCheckpointEncoding(t *testing.T) {
blockSize, targetSize := 256*1024, 1500*1024
for _, f := range allPossibleFormats {
t.Run(testNameWithFormats(compression.EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(f.chunkFormat, compression.EncSnappy, f.headBlockFmt, blockSize, targetSize)
t.Run(testNameWithFormats(compression.Snappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(f.chunkFormat, compression.Snappy, f.headBlockFmt, blockSize, targetSize)
// add a few entries
for i := 0; i < 5; i++ {
@ -1267,7 +1267,7 @@ var (
func BenchmarkBufferedIteratorLabels(b *testing.B) {
for _, f := range HeadBlockFmts {
b.Run(f.String(), func(b *testing.B) {
c := NewMemChunk(ChunkFormatV3, compression.EncSnappy, f, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV3, compression.Snappy, f, testBlockSize, testTargetSize)
_ = fillChunk(c)
labelsSet := []labels.Labels{
@ -1367,8 +1367,8 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
func Test_HeadIteratorReverse(t *testing.T) {
for _, testData := range allPossibleFormats {
t.Run(testNameWithFormats(compression.EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(testData.chunkFormat, compression.EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize)
t.Run(testNameWithFormats(compression.Snappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) {
c := newMemChunkWithFormat(testData.chunkFormat, compression.Snappy, testData.headBlockFmt, testBlockSize, testTargetSize)
genEntry := func(i int64) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, i),
@ -1483,7 +1483,7 @@ func TestMemChunk_Rebound(t *testing.T) {
}
func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {
chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
chk := NewMemChunk(ChunkFormatV3, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
for ; from.Before(through); from = from.Add(time.Second) {
_, err := chk.Append(&logproto.Entry{
Line: from.String(),
@ -1604,7 +1604,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
}
func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time, withStructuredMetadata bool) *MemChunk {
chk := NewMemChunk(ChunkFormatV4, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
chk := NewMemChunk(ChunkFormatV4, compression.GZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
t.Logf("from : %v", from.String())
t.Logf("through: %v", through.String())
var structuredMetadata push.LabelsAdapter
@ -1753,7 +1753,7 @@ func TestMemChunk_SpaceFor(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
for _, format := range allPossibleFormats {
t.Run(fmt.Sprintf("chunk_v%d_head_%s", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
chk := newMemChunkWithFormat(format.chunkFormat, compression.EncNone, format.headBlockFmt, 1024, tc.targetSize)
chk := newMemChunkWithFormat(format.chunkFormat, compression.None, format.headBlockFmt, 1024, tc.targetSize)
chk.blocks = make([]block, tc.nBlocks)
chk.cutBlockSize = tc.cutBlockSize
@ -2055,7 +2055,7 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) {
t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ {
t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) {
chk := NewMemChunk(format.chunkFormat, compression.EncNone, format.headBlockFmt, blockSize, testTargetSize)
chk := NewMemChunk(format.chunkFormat, compression.None, format.headBlockFmt, blockSize, testTargetSize)
ts := time.Now().Unix()
for i := 0; i < 3; i++ {
dup, err := chk.Append(&logproto.Entry{

@ -451,7 +451,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
}
func TestUnorderedChunkIterators(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for i := 0; i < 100; i++ {
// push in reverse order
dup, err := c.Append(&logproto.Entry{
@ -497,11 +497,11 @@ func TestUnorderedChunkIterators(t *testing.T) {
}
func BenchmarkUnorderedRead(b *testing.B) {
legacy := NewMemChunk(ChunkFormatV3, compression.EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
legacy := NewMemChunk(ChunkFormatV3, compression.Snappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(legacy, false)
ordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
ordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(ordered, false)
unordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
unordered := NewMemChunk(ChunkFormatV3, compression.Snappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(unordered, false)
tcs := []struct {
@ -559,7 +559,7 @@ func BenchmarkUnorderedRead(b *testing.B) {
}
func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(c, false)
ct := 0
@ -596,7 +596,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
}
func chunkFrom(xs []logproto.Entry) ([]byte, error) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range xs {
if _, err := c.Append(&x); err != nil {
return nil, err
@ -656,7 +656,7 @@ func TestReorder(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range tc.input {
dup, err := c.Append(&x)
require.False(t, dup)
@ -675,7 +675,7 @@ func TestReorder(t *testing.T) {
}
func TestReorderAcrossBlocks(t *testing.T) {
c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(ChunkFormatV4, compression.Snappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize)
for _, batch := range [][]int{
// ensure our blocks have overlapping bounds and must be reordered
// before closing.

@ -24,7 +24,7 @@ func logprotoEntryWithStructuredMetadata(ts int64, line string, structuredMetada
}
}
func generateData(enc compression.Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) {
func generateData(enc compression.Codec, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) {
chunks := []Chunk{}
i := int64(0)
size := uint64(0)

@ -117,7 +117,7 @@ func (t *deleteRequestsTable) uploadFile() error {
}()
err = t.db.View(func(tx *bbolt.Tx) (err error) {
gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

@ -229,7 +229,7 @@ func (is *indexSet) upload() error {
}
}()
gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

@ -279,7 +279,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := ingesterclient.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize)
chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize)
for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) {
dup, err := chunkEnc.Append(&logproto.Entry{

@ -0,0 +1,85 @@
package compression
import (
"fmt"
"strings"
)
// Codec identifies an available compression codec.
type Codec byte
// The different available codecs
// Make sure to preserve the order, as the numeric values are serialized!
//
//nolint:revive
const (
None Codec = iota
GZIP
Dumb // not supported
LZ4_64k
Snappy
LZ4_256k
LZ4_1M
LZ4_4M
Flate
Zstd
)
var supportedCodecs = []Codec{
None,
GZIP,
LZ4_64k,
Snappy,
LZ4_256k,
LZ4_1M,
LZ4_4M,
Flate,
Zstd,
}
func (e Codec) String() string {
switch e {
case GZIP:
return "gzip"
case None:
return "none"
case LZ4_64k:
return "lz4-64k"
case LZ4_256k:
return "lz4-256k"
case LZ4_1M:
return "lz4-1M"
case LZ4_4M:
return "lz4"
case Snappy:
return "snappy"
case Flate:
return "flate"
case Zstd:
return "zstd"
default:
return "unknown"
}
}
// ParseCodec parses a chunk encoding (compression codec) by its name.
func ParseCodec(enc string) (Codec, error) {
for _, e := range supportedCodecs {
if strings.EqualFold(e.String(), enc) {
return e, nil
}
}
return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedCodecs())
}
// SupportedCodecs returns the list of supported Encoding.
func SupportedCodecs() string {
var sb strings.Builder
for i := range supportedCodecs {
sb.WriteString(supportedCodecs[i].String())
if i != len(supportedCodecs)-1 {
sb.WriteString(", ")
}
}
return sb.String()
}

@ -5,15 +5,15 @@ import "testing"
func TestParseEncoding(t *testing.T) {
tests := []struct {
enc string
want Encoding
want Codec
wantErr bool
}{
{"gzip", EncGZIP, false},
{"gzip", GZIP, false},
{"bad", 0, true},
}
for _, tt := range tests {
t.Run(tt.enc, func(t *testing.T) {
got, err := ParseEncoding(tt.enc)
got, err := ParseCodec(tt.enc)
if (err != nil) != tt.wantErr {
t.Errorf("ParseEncoding() error = %v, wantErr %v", err, tt.wantErr)
return

@ -1,83 +0,0 @@
package compression
import (
"fmt"
"strings"
)
// Encoding identifies an available compression type.
type Encoding byte
// The different available encodings.
// Make sure to preserve the order, as the numeric values are serialized!
const (
EncNone Encoding = iota
EncGZIP
EncDumb // not supported
EncLZ4_64k
EncSnappy
EncLZ4_256k
EncLZ4_1M
EncLZ4_4M
EncFlate
EncZstd
)
var supportedEncoding = []Encoding{
EncNone,
EncGZIP,
EncLZ4_64k,
EncSnappy,
EncLZ4_256k,
EncLZ4_1M,
EncLZ4_4M,
EncFlate,
EncZstd,
}
func (e Encoding) String() string {
switch e {
case EncGZIP:
return "gzip"
case EncNone:
return "none"
case EncLZ4_64k:
return "lz4-64k"
case EncLZ4_256k:
return "lz4-256k"
case EncLZ4_1M:
return "lz4-1M"
case EncLZ4_4M:
return "lz4"
case EncSnappy:
return "snappy"
case EncFlate:
return "flate"
case EncZstd:
return "zstd"
default:
return "unknown"
}
}
// ParseEncoding parses an chunk encoding (compression algorithm) by its name.
func ParseEncoding(enc string) (Encoding, error) {
for _, e := range supportedEncoding {
if strings.EqualFold(e.String(), enc) {
return e, nil
}
}
return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding())
}
// SupportedEncoding returns the list of supported Encoding.
func SupportedEncoding() string {
var sb strings.Builder
for i := range supportedEncoding {
sb.WriteString(supportedEncoding[i].String())
if i != len(supportedEncoding)-1 {
sb.WriteString(", ")
}
}
return sb.String()
}

@ -11,39 +11,39 @@ const (
ExtZstd = ".zst"
)
func ToFileExtension(e Encoding) string {
func ToFileExtension(e Codec) string {
switch e {
case EncNone:
case None:
return ExtNone
case EncGZIP:
case GZIP:
return ExtGZIP
case EncLZ4_64k, EncLZ4_256k, EncLZ4_1M, EncLZ4_4M:
case LZ4_64k, LZ4_256k, LZ4_1M, LZ4_4M:
return ExtLZ4
case EncSnappy:
case Snappy:
return ExtSnappy
case EncFlate:
case Flate:
return ExtFlate
case EncZstd:
case Zstd:
return ExtZstd
default:
panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding()))
panic(fmt.Sprintf("invalid codec: %d, supported: %s", e, SupportedCodecs()))
}
}
func FromFileExtension(ext string) Encoding {
func FromFileExtension(ext string) Codec {
switch ext {
case ExtNone:
return EncNone
return None
case ExtGZIP:
return EncGZIP
return GZIP
case ExtLZ4:
return EncLZ4_4M
return LZ4_4M
case ExtSnappy:
return EncSnappy
return Snappy
case ExtFlate:
return EncFlate
return Flate
case ExtZstd:
return EncZstd
return Zstd
default:
panic(fmt.Sprintf("invalid file extension: %s", ext))
}

@ -51,33 +51,33 @@ var (
noop = NoopPool{}
)
func GetWriterPool(enc Encoding) WriterPool {
func GetWriterPool(enc Codec) WriterPool {
return GetPool(enc).(WriterPool)
}
func GetReaderPool(enc Encoding) ReaderPool {
func GetReaderPool(enc Codec) ReaderPool {
return GetPool(enc).(ReaderPool)
}
func GetPool(enc Encoding) ReaderWriterPool {
func GetPool(enc Codec) ReaderWriterPool {
switch enc {
case EncGZIP:
case GZIP:
return &gzip
case EncLZ4_64k:
case LZ4_64k:
return &lz4_64k
case EncLZ4_256k:
case LZ4_256k:
return &lz4_256k
case EncLZ4_1M:
case LZ4_1M:
return &lz4_1M
case EncLZ4_4M:
case LZ4_4M:
return &lz4_4M
case EncSnappy:
case Snappy:
return &snappy
case EncNone:
case None:
return &noop
case EncFlate:
case Flate:
return &flate
case EncZstd:
case Zstd:
return &zstd
default:
panic("unknown encoding")

@ -15,7 +15,7 @@ import (
)
func TestPool(t *testing.T) {
for _, enc := range supportedEncoding {
for _, enc := range supportedCodecs {
enc := enc
t.Run(enc.String(), func(t *testing.T) {
var wg sync.WaitGroup

@ -566,7 +566,7 @@ func buildChunks(t testing.TB, size int) []Chunk {
for i := 0; i < size; i++ {
// build chunks of 256k blocks, 1.5MB target size. Same as default config.
c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, compression.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024)
c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, compression.GZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024)
fillChunk(t, c)
descs = append(descs, chunkDesc{
chunk: c,

@ -50,7 +50,7 @@ func TestIterator(t *testing.T) {
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk {
return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {

@ -59,7 +59,7 @@ func Test_EncodingChunks(t *testing.T) {
t.Run(fmt.Sprintf("%v-%s", close, tc.desc), func(t *testing.T) {
conf := tc.conf
c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
if close {
require.Nil(t, c.Close())
@ -122,7 +122,7 @@ func Test_EncodingChunks(t *testing.T) {
func Test_EncodingCheckpoint(t *testing.T) {
conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
dup, err := c.Append(&logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "hi there",

@ -189,7 +189,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc {
for i := range res {
res[i] = &chunkDesc{
closed: true,
chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
}
fillChunk(t, res[i].chunk)
require.NoError(t, res[i].chunk.Close())

@ -89,18 +89,18 @@ var (
type Config struct {
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
parsedEncoding compression.Encoding `yaml:"-"` // placeholder for validated encoding
MaxChunkAge time.Duration `yaml:"max_chunk_age"`
AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
parsedEncoding compression.Codec `yaml:"-"` // placeholder for validated encoding
MaxChunkAge time.Duration `yaml:"max_chunk_age"`
AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"`
// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
SyncPeriod time.Duration `yaml:"sync_period"`
@ -150,7 +150,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB
f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", compression.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedEncoding()))
f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", compression.GZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs()))
f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 1*time.Hour, "Parameters used to synchronize ingesters to cut chunks at the same moment. Sync period is used to roll over incoming entry to a new chunk. If chunk's utilization isn't high enough (eg. less than 50% when sync_min_utilization is set to 0.5), then this chunk rollover doesn't happen.")
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0.1, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "The maximum number of errors a stream will report to the user when a push fails. 0 to make unlimited.")
@ -164,7 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}
func (cfg *Config) Validate() error {
enc, err := compression.ParseEncoding(cfg.ChunkEncoding)
enc, err := compression.ParseCodec(cfg.ChunkEncoding)
if err != nil {
return err
}

@ -697,7 +697,7 @@ func TestValidate(t *testing.T) {
}{
{
in: Config{
ChunkEncoding: compression.EncGZIP.String(),
ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@ -708,7 +708,7 @@ func TestValidate(t *testing.T) {
MaxChunkAge: time.Minute,
},
expected: Config{
ChunkEncoding: compression.EncGZIP.String(),
ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@ -717,12 +717,12 @@ func TestValidate(t *testing.T) {
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
parsedEncoding: compression.EncGZIP,
parsedEncoding: compression.GZIP,
},
},
{
in: Config{
ChunkEncoding: compression.EncSnappy.String(),
ChunkEncoding: compression.Snappy.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@ -732,7 +732,7 @@ func TestValidate(t *testing.T) {
IndexShards: index.DefaultIndexShards,
},
expected: Config{
ChunkEncoding: compression.EncSnappy.String(),
ChunkEncoding: compression.Snappy.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@ -740,7 +740,7 @@ func TestValidate(t *testing.T) {
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
parsedEncoding: compression.EncSnappy,
parsedEncoding: compression.Snappy,
},
},
{
@ -758,7 +758,7 @@ func TestValidate(t *testing.T) {
},
{
in: Config{
ChunkEncoding: compression.EncGZIP.String(),
ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@ -771,7 +771,7 @@ func TestValidate(t *testing.T) {
},
{
in: Config{
ChunkEncoding: compression.EncGZIP.String(),
ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
@ -784,7 +784,7 @@ func TestValidate(t *testing.T) {
},
{
in: Config{
ChunkEncoding: compression.EncGZIP.String(),
ChunkEncoding: compression.GZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,

@ -277,7 +277,7 @@ func TestStreamIterator(t *testing.T) {
{"gzipChunk", func() *chunkenc.MemChunk {
chunkfmt, headfmt := defaultChunkFormat(t)
return chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0)
return chunkenc.NewMemChunk(chunkfmt, compression.GZIP, headfmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {

@ -21,7 +21,7 @@ type TarEntry struct {
Body io.ReadSeeker
}
func TarCompress(enc compression.Encoding, dst io.Writer, reader BlockReader) error {
func TarCompress(enc compression.Codec, dst io.Writer, reader BlockReader) error {
comprPool := compression.GetWriterPool(enc)
comprWriter := comprPool.GetWriter(dst)
defer func() {
@ -61,7 +61,7 @@ func Tar(dst io.Writer, reader BlockReader) error {
return itr.Err()
}
func UnTarCompress(enc compression.Encoding, dst string, r io.Reader) error {
func UnTarCompress(enc compression.Codec, dst string, r io.Reader) error {
comprPool := compression.GetReaderPool(enc)
comprReader, err := comprPool.GetReader(r)
if err != nil {

@ -24,7 +24,7 @@ func TestArchive(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncNone,
encoding: compression.None,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@ -82,17 +82,17 @@ func TestArchive(t *testing.T) {
func TestArchiveCompression(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
enc compression.Encoding
enc compression.Codec
}{
{compression.EncNone},
{compression.EncGZIP},
{compression.EncSnappy},
{compression.EncLZ4_64k},
{compression.EncLZ4_256k},
{compression.EncLZ4_1M},
{compression.EncLZ4_4M},
{compression.EncFlate},
{compression.EncZstd},
{compression.None},
{compression.GZIP},
{compression.Snappy},
{compression.LZ4_64k},
{compression.LZ4_256k},
{compression.LZ4_1M},
{compression.LZ4_4M},
{compression.Flate},
{compression.Zstd},
} {
t.Run(tc.enc.String(), func(t *testing.T) {
// for writing files to two dirs for comparison and ensuring they're equal
@ -106,7 +106,7 @@ func TestArchiveCompression(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncNone,
encoding: compression.None,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,

@ -316,7 +316,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc mempool.Allocator,
return nil, false, errors.Wrap(err, "seeking to bloom page")
}
if b.schema.encoding == compression.EncNone {
if b.schema.encoding == compression.None {
res, err = LazyDecodeBloomPageNoCompression(r, alloc, page)
} else {
res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page)

@ -38,7 +38,7 @@ func TestTokenizerPopulate(t *testing.T) {
{Name: "pod", Value: "loki-1"},
{Name: "trace_id", Value: "3bef3c91643bde73"},
}
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
@ -83,7 +83,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) {
{Name: "pod", Value: "loki-1"},
{Name: "trace_id", Value: "3bef3c91643bde73"},
}
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
@ -120,7 +120,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) {
}
func chunkRefItrFromMetadata(metadata ...push.LabelsAdapter) (iter.EntryIterator, error) {
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
for i, md := range metadata {
if _, err := memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, int64(i)),
@ -205,7 +205,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.Snappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: "",

@ -66,7 +66,7 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) {
enc.PutBE64(b.BlockSize)
}
func NewBlockOptions(enc compression.Encoding, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions {
func NewBlockOptions(enc compression.Codec, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions {
opts := NewBlockOptionsFromSchema(Schema{
version: CurrentSchemaVersion,
encoding: enc,

@ -15,12 +15,12 @@ import (
"github.com/grafana/loki/v3/pkg/util/mempool"
)
var blockEncodings = []compression.Encoding{
compression.EncNone,
compression.EncGZIP,
compression.EncSnappy,
compression.EncLZ4_256k,
compression.EncZstd,
var blockEncodings = []compression.Codec{
compression.None,
compression.GZIP,
compression.Snappy,
compression.LZ4_256k,
compression.Zstd,
}
func TestBlockOptions_RoundTrip(t *testing.T) {
@ -28,7 +28,7 @@ func TestBlockOptions_RoundTrip(t *testing.T) {
opts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@ -201,7 +201,7 @@ func TestMergeBuilder(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@ -298,7 +298,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@ -395,7 +395,7 @@ func TestBlockReset(t *testing.T) {
schema := Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
}
builder, err := NewBlockBuilder(
@ -451,7 +451,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy, // test with different encodings?
encoding: compression.Snappy, // test with different encodings?
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,

@ -60,7 +60,7 @@ func TestFusedQuerier(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@ -147,7 +147,7 @@ func TestFusedQuerier_MultiPage(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10, // So we force one bloom per page
@ -296,7 +296,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10, // So we force one series per page
@ -354,7 +354,7 @@ func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) {
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncNone,
encoding: compression.None,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
@ -415,7 +415,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 256 << 10, // 256k
BloomPageSize: 1 << 20, // 1MB

@ -39,13 +39,13 @@ var (
type Schema struct {
version Version
encoding compression.Encoding
encoding compression.Codec
}
func NewSchema() Schema {
return Schema{
version: CurrentSchemaVersion,
encoding: compression.EncNone,
encoding: compression.None,
}
}
@ -105,8 +105,8 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error {
return errors.Errorf("invalid version. expected %d, got %d", 3, s.version)
}
s.encoding = compression.Encoding(dec.Byte())
if _, err := compression.ParseEncoding(s.encoding.String()); err != nil {
s.encoding = compression.Codec(dec.Byte())
if _, err := compression.ParseCodec(s.encoding.String()); err != nil {
return errors.Wrap(err, "parsing encoding")
}

@ -30,7 +30,7 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.EncSnappy,
encoding: compression.Snappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,

@ -14,7 +14,7 @@ import (
// smallBlockOpts returns a set of block options that are suitable for testing
// characterized by small page sizes
func smallBlockOpts(v Version, enc compression.Encoding) BlockOptions {
func smallBlockOpts(v Version, enc compression.Codec) BlockOptions {
return BlockOptions{
Schema: Schema{
version: v,
@ -33,7 +33,7 @@ func setup(v Version) (BlockOptions, []SeriesWithBlooms, BlockWriter, BlockReade
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)
return smallBlockOpts(v, compression.EncNone), data, writer, reader
return smallBlockOpts(v, compression.None), data, writer, reader
}
func TestV3Roundtrip(t *testing.T) {

@ -35,7 +35,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str
for i := 0; i < 111; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
_, err := cs.Append(&logproto.Entry{
Timestamp: ts.Time(),

@ -82,7 +82,7 @@ func TestGrpcStore(t *testing.T) {
newChunkData := func() chunk.Data {
return chunkenc.NewFacade(
chunkenc.NewMemChunk(
chunkenc.ChunkFormatV3, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0,
chunkenc.ChunkFormatV3, compression.None, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0,
), 0, 0)
}

@ -87,7 +87,7 @@ func CreateChunks(scfg config.SchemaConfig, startIndex, batchSize int, from mode
}
func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk {
cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.GZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
for ts := from; ts <= through; ts = ts.Add(15 * time.Second) {
_, err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)})

@ -312,7 +312,7 @@ func makeChunks(now time.Time, tpls ...c) []chunk.Chunk {
from := int(chk.from) / int(time.Hour)
// This is only here because it's helpful for debugging.
// This isn't even the write format for Loki but we dont' care for the sake of these tests.
memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.None, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0)
// To make sure the fetcher doesn't swap keys and buffers each chunk is built with different, but deterministic data
for i := 0; i < from; i++ {
_, _ = memChk.Append(&logproto.Entry{

@ -104,7 +104,7 @@ func fillStore(cm storage.ClientMetrics) error {
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864)
chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.LZ4_4M, headfmt, 262144, 1572864)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
@ -127,7 +127,7 @@ func fillStore(cm storage.ClientMetrics) error {
if flushCount >= maxChunks {
return
}
chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncLZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864)
chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.LZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864)
}
}
}(i)

@ -2037,7 +2037,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) {
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864)
chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.LZ4_4M, headfmt, 262144, 1572864)
for ts := chkFrom; !ts.After(chkThrough); ts = ts.Add(time.Second) {
entry := logproto.Entry{
Timestamp: ts,

@ -753,7 +753,7 @@ func dummyChunkWithFormat(t testing.TB, now model.Time, metric labels.Labels, fo
samples := 1
chunkStart := now.Add(-time.Hour)
chk := chunkenc.NewMemChunk(format, compression.EncGZIP, headfmt, 256*1024, 0)
chk := chunkenc.NewMemChunk(format, compression.GZIP, headfmt, 256*1024, 0)
for i := 0; i < samples; i++ {
ts := time.Duration(i) * 15 * time.Second
dup, err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)})

@ -93,7 +93,7 @@ func TestChunkWriter_PutOne(t *testing.T) {
chunkfmt, headfmt, err := periodConfig.ChunkFormat()
require.NoError(t, err)
memchk := chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0)
memchk := chunkenc.NewMemChunk(chunkfmt, compression.GZIP, headfmt, 256*1024, 0)
chk := chunk.NewChunk("fake", model.Fingerprint(0), []labels.Label{{Name: "foo", Value: "bar"}}, chunkenc.NewFacade(memchk, 0, 0), 100, 400)
for name, tc := range map[string]struct {

@ -73,7 +73,7 @@ func (r Ref) Interval() Interval {
type BlockRef struct {
Ref
compression.Encoding
compression.Codec
}
func (r BlockRef) String() string {
@ -220,17 +220,17 @@ func newRefFrom(tenant, table string, md v1.BlockMetadata) Ref {
}
}
func newBlockRefWithEncoding(ref Ref, enc compression.Encoding) BlockRef {
return BlockRef{Ref: ref, Encoding: enc}
func newBlockRefWithEncoding(ref Ref, enc compression.Codec) BlockRef {
return BlockRef{Ref: ref, Codec: enc}
}
func BlockFrom(enc compression.Encoding, tenant, table string, blk *v1.Block) (Block, error) {
func BlockFrom(enc compression.Codec, tenant, table string, blk *v1.Block) (Block, error) {
md, _ := blk.Metadata()
ref := newBlockRefWithEncoding(newRefFrom(tenant, table, md), enc)
// TODO(owen-d): pool
buf := bytes.NewBuffer(nil)
err := v1.TarCompress(ref.Encoding, buf, blk.Reader())
err := v1.TarCompress(ref.Codec, buf, blk.Reader())
if err != nil {
return Block{}, err
@ -330,7 +330,7 @@ func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirector
return BlockDirectory{}, fmt.Errorf("failed to create block directory %s: %w", path, err)
}
err = v1.UnTarCompress(ref.Encoding, path, rc)
err = v1.UnTarCompress(ref.Codec, path, rc)
if err != nil {
return BlockDirectory{}, fmt.Errorf("failed to extract block file %s: %w", key, err)
}

@ -21,16 +21,16 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
)
var supportedCompressions = []compression.Encoding{
compression.EncNone,
compression.EncGZIP,
compression.EncSnappy,
compression.EncLZ4_64k,
compression.EncLZ4_256k,
compression.EncLZ4_1M,
compression.EncLZ4_4M,
compression.EncFlate,
compression.EncZstd,
var supportedCompressions = []compression.Codec{
compression.None,
compression.GZIP,
compression.Snappy,
compression.LZ4_64k,
compression.LZ4_256k,
compression.LZ4_1M,
compression.LZ4_4M,
compression.Flate,
compression.Zstd,
}
func parseTime(s string) model.Time {
@ -209,7 +209,7 @@ func TestBloomClient_DeleteMetas(t *testing.T) {
})
}
func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Encoding) (Block, error) {
func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Codec) (Block, error) {
step := int64((24 * time.Hour).Seconds())
day := start.Unix() / step
@ -234,7 +234,7 @@ func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, min
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
Encoding: enc,
Codec: enc,
},
Data: fp,
}
@ -273,9 +273,9 @@ func TestBloomClient_GetBlocks(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.EncGZIP)
b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.GZIP)
require.NoError(t, err)
b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.EncNone)
b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.None)
require.NoError(t, err)
t.Run("exists", func(t *testing.T) {
@ -318,7 +318,7 @@ func TestBloomClient_PutBlock(t *testing.T) {
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
Encoding: enc,
Codec: enc,
},
Data: fp,
}
@ -343,11 +343,11 @@ func TestBloomClient_DeleteBlocks(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.EncNone)
b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.None)
require.NoError(t, err)
b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.EncGZIP)
b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.GZIP)
require.NoError(t, err)
b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.EncSnappy)
b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.Snappy)
require.NoError(t, err)
oc := c.client.(*testutils.InMemoryObjectClient)

@ -329,11 +329,11 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) {
refs := []BlockRef{
// no directory for block
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Encoding: compression.EncNone},
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Codec: compression.None},
// invalid directory for block
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Encoding: compression.EncSnappy},
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Codec: compression.Snappy},
// valid directory for block
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Encoding: compression.EncGZIP},
{Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Codec: compression.GZIP},
}
dirs := []string{
localFilePathWithoutExtension(refs[0], resolver),

@ -81,7 +81,7 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) {
}
func (defaultKeyResolver) Block(ref BlockRef) Location {
ext := blockExtension + compression.ToFileExtension(ref.Encoding)
ext := blockExtension + compression.ToFileExtension(ref.Codec)
return simpleLocation{
BloomPrefix,
fmt.Sprintf("%v", ref.TableName),
@ -95,7 +95,7 @@ func (defaultKeyResolver) Block(ref BlockRef) Location {
func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) {
dir, fn := path.Split(loc.Addr())
ext, enc := path.Ext(fn), compression.EncNone
ext, enc := path.Ext(fn), compression.None
if ext != "" && ext != blockExtension {
// trim compression extension
fn = strings.TrimSuffix(fn, ext)
@ -142,7 +142,7 @@ func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) {
EndTimestamp: interval.End,
Checksum: uint32(checksum),
},
Encoding: enc,
Codec: enc,
}, nil
}
@ -286,9 +286,9 @@ func (ls locations) LocalPath() string {
}
func cacheKey(ref BlockRef) string {
return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Encoding))
return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Codec))
}
func localFilePathWithoutExtension(ref BlockRef, res KeyResolver) string {
return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Encoding))
return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Codec))
}

@ -33,17 +33,17 @@ func TestResolver_ParseMetaKey(t *testing.T) {
func TestResolver_ParseBlockKey(t *testing.T) {
for _, tc := range []struct {
srcEnc, dstEnc compression.Encoding
srcEnc, dstEnc compression.Codec
}{
{compression.EncNone, compression.EncNone},
{compression.EncGZIP, compression.EncGZIP},
{compression.EncSnappy, compression.EncSnappy},
{compression.EncLZ4_64k, compression.EncLZ4_4M},
{compression.EncLZ4_256k, compression.EncLZ4_4M},
{compression.EncLZ4_1M, compression.EncLZ4_4M},
{compression.EncLZ4_4M, compression.EncLZ4_4M},
{compression.EncFlate, compression.EncFlate},
{compression.EncZstd, compression.EncZstd},
{compression.None, compression.None},
{compression.GZIP, compression.GZIP},
{compression.Snappy, compression.Snappy},
{compression.LZ4_64k, compression.LZ4_4M},
{compression.LZ4_256k, compression.LZ4_4M},
{compression.LZ4_1M, compression.LZ4_4M},
{compression.LZ4_4M, compression.LZ4_4M},
{compression.Flate, compression.Flate},
{compression.Zstd, compression.Zstd},
} {
t.Run(tc.srcEnc.String(), func(t *testing.T) {
r := defaultKeyResolver{}
@ -56,7 +56,7 @@ func TestResolver_ParseBlockKey(t *testing.T) {
EndTimestamp: 3600000,
Checksum: 43981,
},
Encoding: tc.srcEnc,
Codec: tc.srcEnc,
}
// encode block ref as string
@ -69,8 +69,8 @@ func TestResolver_ParseBlockKey(t *testing.T) {
parsed, err := r.ParseBlockKey(key(path))
require.NoError(t, err)
expected := BlockRef{
Ref: ref.Ref,
Encoding: tc.dstEnc,
Ref: ref.Ref,
Codec: tc.dstEnc,
}
require.Equal(t, expected, parsed)
})

@ -116,7 +116,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start
err := blockWriter.Init()
require.NoError(t, err)
enc := compression.EncGZIP
enc := compression.GZIP
err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)
@ -130,7 +130,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
Encoding: enc,
Codec: enc,
},
Data: fp,
}

@ -32,7 +32,7 @@ func createChunk(t testing.TB, chunkFormat byte, headBlockFmt chunkenc.HeadBlock
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := ingesterclient.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkFormat, compression.EncSnappy, headBlockFmt, blockSize, targetSize)
chunkEnc := chunkenc.NewMemChunk(chunkFormat, compression.Snappy, headBlockFmt, blockSize, targetSize)
for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) {
dup, err := chunkEnc.Append(&logproto.Entry{

@ -145,7 +145,7 @@ func (t *indexSet) uploadIndex(ctx context.Context, idx index.Index) error {
}
}()
gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

@ -109,7 +109,7 @@ func newChunk(chunkFormat byte, headBlockFmt chunkenc.HeadBlockFmt, stream logpr
lbs = builder.Labels()
}
from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp)
chk := chunkenc.NewMemChunk(chunkFormat, compression.EncGZIP, headBlockFmt, 256*1024, 0)
chk := chunkenc.NewMemChunk(chunkFormat, compression.GZIP, headBlockFmt, 256*1024, 0)
for _, e := range stream.Entries {
_, _ = chk.Append(&e)
}

@ -490,7 +490,7 @@ func (l *Limits) Validate() error {
return errors.Wrap(err, "invalid tsdb sharding strategy")
}
if _, err := compression.ParseEncoding(l.BloomBlockEncoding); err != nil {
if _, err := compression.ParseCodec(l.BloomBlockEncoding); err != nil {
return err
}

@ -339,7 +339,7 @@ func TestLimitsValidation(t *testing.T) {
},
{
limits: Limits{DeletionMode: "disabled", BloomBlockEncoding: "unknown"},
expected: fmt.Errorf("invalid encoding: unknown, supported: %s", compression.SupportedEncoding()),
expected: fmt.Errorf("invalid encoding: unknown, supported: %s", compression.SupportedCodecs()),
},
} {
desc := fmt.Sprintf("%s/%s", tc.limits.DeletionMode, tc.limits.BloomBlockEncoding)

@ -257,7 +257,7 @@ func uploadFile(idx shipperindex.Index, indexStorageClient shipperstorage.Client
}
}()
gzipPool := compression.GetWriterPool(compression.EncGZIP)
gzipPool := compression.GetWriterPool(compression.GZIP)
compressedWriter := gzipPool.GetWriter(f)
defer gzipPool.PutWriter(compressedWriter)

Loading…
Cancel
Save