@ -10,6 +10,8 @@ import (
"io/ioutil"
"github.com/golang/snappy"
"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4"
)
@ -33,13 +35,31 @@ func init() {
}
var (
encNone = Encoding { code : 0 , name : "none" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return reader , nil } }
encGZIP = Encoding { code : 1 , name : "gzip" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return gzip . NewReader ( reader ) } }
encDumb = Encoding { code : 2 , name : "dumb" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return reader , nil } }
encLZ4 = Encoding { code : 3 , name : "lz4" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return lz4 . NewReader ( reader ) , nil } }
encSnappy = Encoding { code : 4 , name : "snappy" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return snappy . NewReader ( reader ) , nil } }
encNone = Encoding { code : 0 , name : "none" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return reader , nil } }
encGZIP = Encoding { code : 1 , name : "gzip" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return gzip . NewReader ( reader ) } }
encDumb = Encoding { code : 2 , name : "dumb" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return reader , nil } }
encLZ4 = Encoding { code : 3 , name : "lz4" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return lz4 . NewReader ( reader ) , nil } }
encSnappy = Encoding { code : 4 , name : "snappy" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return snappy . NewReader ( reader ) , nil } }
enclz4_256k = Encoding { code : 5 , name : "lz4-256k" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return lz4 . NewReader ( reader ) , nil } }
enclz4_1M = Encoding { code : 6 , name : "lz4-1M" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return lz4 . NewReader ( reader ) , nil } }
enclz4_4M = Encoding { code : 7 , name : "lz4-4M" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return lz4 . NewReader ( reader ) , nil } }
encFlate = Encoding { code : 8 , name : "flate" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) { return flate . NewReader ( reader ) , nil } }
encZstd = Encoding { code : 9 , name : "lz4-256k" , readerFn : func ( reader io . Reader ) ( io . Reader , error ) {
r , err := zstd . NewReader ( reader )
if err != nil {
panic ( err )
}
return r , nil
} }
Encodings = [ ] Encoding { encNone , encGZIP , encDumb , encLZ4 , encSnappy }
Encodings = [ ] Encoding { encNone , encGZIP , encDumb , encLZ4 , encSnappy , enclz4_256k , enclz4_1M , enclz4_4M , encFlate , encZstd }
)
const (
_ byte = iota
chunkFormatV1
chunkFormatV2
chunkFormatV3
)
type LokiChunk struct {
@ -58,6 +78,8 @@ type LokiBlock struct {
dataOffset uint64 // offset in the data-part of chunks file
uncompSize uint64 // size of the original data uncompressed
rawData [ ] byte // data as stored in chunk file, compressed
originalData [ ] byte // data uncompressed from rawData
@ -73,6 +95,27 @@ type LokiEntry struct {
}
func parseLokiChunk ( chunkHeader * ChunkHeader , r io . Reader ) ( * LokiChunk , error ) {
/ * Loki Chunk Format
4 B magic number
1 B version
1 B encoding
Block 1 <- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - B
Block 1 Checksum
...
Uvarint # blocks <- -- -- -- -- -- -- -- -- -- -- -- -- - A
Block1 Uvarint # entries
Block1 Varint64 mint
Block1 Varint64 maxt
Block1 Varint64 offset -- -- -- -- -- -- -- -- -- -- > B
Block1 Uvarint uncomp size ( V3 chunks and greater only )
Block1 Uvarint length
Block1 Meta Checksum
...
4 B Meta offset -- -- -- -- -- -- -- -- -- -- -- -- -- -- > A
* /
// Loki chunks need to be loaded into memory, because some offsets are actually stored at the end.
data := make ( [ ] byte , chunkHeader . DataLength )
if _ , err := io . ReadFull ( r , data ) ; err != nil {
@ -83,7 +126,10 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) {
return nil , fmt . Errorf ( "invalid magic number: %0x" , num )
}
compression , err := getCompression ( data [ 4 ] , data [ 5 ] )
// Chunk format is at position 4
f := data [ 4 ]
compression , err := getCompression ( f , data [ 5 ] )
if err != nil {
return nil , fmt . Errorf ( "failed to read compression: %w" , err )
}
@ -115,6 +161,9 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) {
block . minT , metadata , err = readVarint ( err , metadata )
block . maxT , metadata , err = readVarint ( err , metadata )
block . dataOffset , metadata , err = readUvarint ( err , metadata )
if f >= chunkFormatV3 {
block . uncompSize , metadata , err = readUvarint ( err , metadata )
}
dataLength := uint64 ( 0 )
dataLength , metadata , err = readUvarint ( err , metadata )
@ -195,11 +244,11 @@ func readUvarint(prevErr error, buf []byte) (uint64, []byte, error) {
}
func getCompression ( format byte , code byte ) ( Encoding , error ) {
if format == 1 {
if format == chunkFormatV 1 {
return encGZIP , nil
}
if format == 2 {
if format >= chunkFormatV 2 {
for _ , e := range Encodings {
if e . code == int ( code ) {
return e , nil