mirror of https://github.com/grafana/loki
Improve lz4 compression (#2614)
* Improve lz4 compression. - Move to v4. - Remove not required checksuming. - Default will now write 4M blocks, it's backward compatible for reads. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * vendor update Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>pull/2705/head
parent
1b2bae46ba
commit
6500f82195
@ -1,23 +0,0 @@ |
||||
// +build lz4debug
|
||||
|
||||
package lz4 |
||||
|
||||
import ( |
||||
"fmt" |
||||
"os" |
||||
"path/filepath" |
||||
"runtime" |
||||
) |
||||
|
||||
const debugFlag = true |
||||
|
||||
func debug(args ...interface{}) { |
||||
_, file, line, _ := runtime.Caller(1) |
||||
file = filepath.Base(file) |
||||
|
||||
f := fmt.Sprintf("LZ4: %s:%d %s", file, line, args[0]) |
||||
if f[len(f)-1] != '\n' { |
||||
f += "\n" |
||||
} |
||||
fmt.Fprintf(os.Stderr, f, args[1:]...) |
||||
} |
@ -1,7 +0,0 @@ |
||||
// +build !lz4debug
|
||||
|
||||
package lz4 |
||||
|
||||
const debugFlag = false |
||||
|
||||
func debug(args ...interface{}) {} |
@ -1,30 +0,0 @@ |
||||
package lz4 |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"os" |
||||
rdebug "runtime/debug" |
||||
) |
||||
|
||||
var ( |
||||
// ErrInvalidSourceShortBuffer is returned by UncompressBlock or CompressBLock when a compressed
|
||||
// block is corrupted or the destination buffer is not large enough for the uncompressed data.
|
||||
ErrInvalidSourceShortBuffer = errors.New("lz4: invalid source or destination buffer too short") |
||||
// ErrInvalid is returned when reading an invalid LZ4 archive.
|
||||
ErrInvalid = errors.New("lz4: bad magic number") |
||||
// ErrBlockDependency is returned when attempting to decompress an archive created with block dependency.
|
||||
ErrBlockDependency = errors.New("lz4: block dependency not supported") |
||||
// ErrUnsupportedSeek is returned when attempting to Seek any way but forward from the current position.
|
||||
ErrUnsupportedSeek = errors.New("lz4: can only seek forward from io.SeekCurrent") |
||||
) |
||||
|
||||
func recoverBlock(e *error) { |
||||
if r := recover(); r != nil && *e == nil { |
||||
if debugFlag { |
||||
fmt.Fprintln(os.Stderr, r) |
||||
rdebug.PrintStack() |
||||
} |
||||
*e = ErrInvalidSourceShortBuffer |
||||
} |
||||
} |
@ -1,113 +0,0 @@ |
||||
// Package lz4 implements reading and writing lz4 compressed data (a frame),
|
||||
// as specified in http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html.
|
||||
//
|
||||
// Although the block level compression and decompression functions are exposed and are fully compatible
|
||||
// with the lz4 block format definition, they are low level and should not be used directly.
|
||||
// For a complete description of an lz4 compressed block, see:
|
||||
// http://fastcompression.blogspot.fr/2011/05/lz4-explained.html
|
||||
//
|
||||
// See https://github.com/Cyan4973/lz4 for the reference C implementation.
|
||||
//
|
||||
package lz4 |
||||
|
||||
import "math/bits" |
||||
|
||||
import "sync" |
||||
|
||||
const ( |
||||
// Extension is the LZ4 frame file name extension
|
||||
Extension = ".lz4" |
||||
// Version is the LZ4 frame format version
|
||||
Version = 1 |
||||
|
||||
frameMagic uint32 = 0x184D2204 |
||||
frameSkipMagic uint32 = 0x184D2A50 |
||||
|
||||
// The following constants are used to setup the compression algorithm.
|
||||
minMatch = 4 // the minimum size of the match sequence size (4 bytes)
|
||||
winSizeLog = 16 // LZ4 64Kb window size limit
|
||||
winSize = 1 << winSizeLog |
||||
winMask = winSize - 1 // 64Kb window of previous data for dependent blocks
|
||||
compressedBlockFlag = 1 << 31 |
||||
compressedBlockMask = compressedBlockFlag - 1 |
||||
|
||||
// hashLog determines the size of the hash table used to quickly find a previous match position.
|
||||
// Its value influences the compression speed and memory usage, the lower the faster,
|
||||
// but at the expense of the compression ratio.
|
||||
// 16 seems to be the best compromise for fast compression.
|
||||
hashLog = 16 |
||||
htSize = 1 << hashLog |
||||
|
||||
mfLimit = 10 + minMatch // The last match cannot start within the last 14 bytes.
|
||||
) |
||||
|
||||
// map the block max size id with its value in bytes: 64Kb, 256Kb, 1Mb and 4Mb.
|
||||
const ( |
||||
blockSize64K = 1 << (16 + 2*iota) |
||||
blockSize256K |
||||
blockSize1M |
||||
blockSize4M |
||||
) |
||||
|
||||
var ( |
||||
// Keep a pool of buffers for each valid block sizes.
|
||||
bsMapValue = [...]*sync.Pool{ |
||||
newBufferPool(2 * blockSize64K), |
||||
newBufferPool(2 * blockSize256K), |
||||
newBufferPool(2 * blockSize1M), |
||||
newBufferPool(2 * blockSize4M), |
||||
} |
||||
) |
||||
|
||||
// newBufferPool returns a pool for buffers of the given size.
|
||||
func newBufferPool(size int) *sync.Pool { |
||||
return &sync.Pool{ |
||||
New: func() interface{} { |
||||
return make([]byte, size) |
||||
}, |
||||
} |
||||
} |
||||
|
||||
// getBuffer returns a buffer to its pool.
|
||||
func getBuffer(size int) []byte { |
||||
idx := blockSizeValueToIndex(size) - 4 |
||||
return bsMapValue[idx].Get().([]byte) |
||||
} |
||||
|
||||
// putBuffer returns a buffer to its pool.
|
||||
func putBuffer(size int, buf []byte) { |
||||
if cap(buf) > 0 { |
||||
idx := blockSizeValueToIndex(size) - 4 |
||||
bsMapValue[idx].Put(buf[:cap(buf)]) |
||||
} |
||||
} |
||||
func blockSizeIndexToValue(i byte) int { |
||||
return 1 << (16 + 2*uint(i)) |
||||
} |
||||
func isValidBlockSize(size int) bool { |
||||
const blockSizeMask = blockSize64K | blockSize256K | blockSize1M | blockSize4M |
||||
|
||||
return size&blockSizeMask > 0 && bits.OnesCount(uint(size)) == 1 |
||||
} |
||||
func blockSizeValueToIndex(size int) byte { |
||||
return 4 + byte(bits.TrailingZeros(uint(size)>>16)/2) |
||||
} |
||||
|
||||
// Header describes the various flags that can be set on a Writer or obtained from a Reader.
|
||||
// The default values match those of the LZ4 frame format definition
|
||||
// (http://fastcompression.blogspot.com/2013/04/lz4-streaming-format-final.html).
|
||||
//
|
||||
// NB. in a Reader, in case of concatenated frames, the Header values may change between Read() calls.
|
||||
// It is the caller's responsibility to check them if necessary.
|
||||
type Header struct { |
||||
BlockChecksum bool // Compressed blocks checksum flag.
|
||||
NoChecksum bool // Frame checksum flag.
|
||||
BlockMaxSize int // Size of the uncompressed data block (one of [64KB, 256KB, 1MB, 4MB]). Default=4MB.
|
||||
Size uint64 // Frame total size. It is _not_ computed by the Writer.
|
||||
CompressionLevel int // Compression level (higher is better, use 0 for fastest compression).
|
||||
done bool // Header processed flag (Read or Write and checked).
|
||||
} |
||||
|
||||
func (h *Header) Reset() { |
||||
h.done = false |
||||
} |
@ -1,29 +0,0 @@ |
||||
//+build go1.10
|
||||
|
||||
package lz4 |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strings" |
||||
) |
||||
|
||||
func (h Header) String() string { |
||||
var s strings.Builder |
||||
|
||||
s.WriteString(fmt.Sprintf("%T{", h)) |
||||
if h.BlockChecksum { |
||||
s.WriteString("BlockChecksum: true ") |
||||
} |
||||
if h.NoChecksum { |
||||
s.WriteString("NoChecksum: true ") |
||||
} |
||||
if bs := h.BlockMaxSize; bs != 0 && bs != 4<<20 { |
||||
s.WriteString(fmt.Sprintf("BlockMaxSize: %d ", bs)) |
||||
} |
||||
if l := h.CompressionLevel; l != 0 { |
||||
s.WriteString(fmt.Sprintf("CompressionLevel: %d ", l)) |
||||
} |
||||
s.WriteByte('}') |
||||
|
||||
return s.String() |
||||
} |
@ -1,29 +0,0 @@ |
||||
//+build !go1.10
|
||||
|
||||
package lz4 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"fmt" |
||||
) |
||||
|
||||
func (h Header) String() string { |
||||
var s bytes.Buffer |
||||
|
||||
s.WriteString(fmt.Sprintf("%T{", h)) |
||||
if h.BlockChecksum { |
||||
s.WriteString("BlockChecksum: true ") |
||||
} |
||||
if h.NoChecksum { |
||||
s.WriteString("NoChecksum: true ") |
||||
} |
||||
if bs := h.BlockMaxSize; bs != 0 && bs != 4<<20 { |
||||
s.WriteString(fmt.Sprintf("BlockMaxSize: %d ", bs)) |
||||
} |
||||
if l := h.CompressionLevel; l != 0 { |
||||
s.WriteString(fmt.Sprintf("CompressionLevel: %d ", l)) |
||||
} |
||||
s.WriteByte('}') |
||||
|
||||
return s.String() |
||||
} |
@ -1,335 +0,0 @@ |
||||
package lz4 |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
|
||||
"github.com/pierrec/lz4/internal/xxh32" |
||||
) |
||||
|
||||
// Reader implements the LZ4 frame decoder.
|
||||
// The Header is set after the first call to Read().
|
||||
// The Header may change between Read() calls in case of concatenated frames.
|
||||
type Reader struct { |
||||
Header |
||||
// Handler called when a block has been successfully read.
|
||||
// It provides the number of bytes read.
|
||||
OnBlockDone func(size int) |
||||
|
||||
buf [8]byte // Scrap buffer.
|
||||
pos int64 // Current position in src.
|
||||
src io.Reader // Source.
|
||||
zdata []byte // Compressed data.
|
||||
data []byte // Uncompressed data.
|
||||
idx int // Index of unread bytes into data.
|
||||
checksum xxh32.XXHZero // Frame hash.
|
||||
skip int64 // Bytes to skip before next read.
|
||||
dpos int64 // Position in dest
|
||||
} |
||||
|
||||
// NewReader returns a new LZ4 frame decoder.
|
||||
// No access to the underlying io.Reader is performed.
|
||||
func NewReader(src io.Reader) *Reader { |
||||
r := &Reader{src: src} |
||||
return r |
||||
} |
||||
|
||||
// readHeader checks the frame magic number and parses the frame descriptoz.
|
||||
// Skippable frames are supported even as a first frame although the LZ4
|
||||
// specifications recommends skippable frames not to be used as first frames.
|
||||
func (z *Reader) readHeader(first bool) error { |
||||
defer z.checksum.Reset() |
||||
|
||||
buf := z.buf[:] |
||||
for { |
||||
magic, err := z.readUint32() |
||||
if err != nil { |
||||
z.pos += 4 |
||||
if !first && err == io.ErrUnexpectedEOF { |
||||
return io.EOF |
||||
} |
||||
return err |
||||
} |
||||
if magic == frameMagic { |
||||
break |
||||
} |
||||
if magic>>8 != frameSkipMagic>>8 { |
||||
return ErrInvalid |
||||
} |
||||
skipSize, err := z.readUint32() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
z.pos += 4 |
||||
m, err := io.CopyN(ioutil.Discard, z.src, int64(skipSize)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
z.pos += m |
||||
} |
||||
|
||||
// Header.
|
||||
if _, err := io.ReadFull(z.src, buf[:2]); err != nil { |
||||
return err |
||||
} |
||||
z.pos += 8 |
||||
|
||||
b := buf[0] |
||||
if v := b >> 6; v != Version { |
||||
return fmt.Errorf("lz4: invalid version: got %d; expected %d", v, Version) |
||||
} |
||||
if b>>5&1 == 0 { |
||||
return ErrBlockDependency |
||||
} |
||||
z.BlockChecksum = b>>4&1 > 0 |
||||
frameSize := b>>3&1 > 0 |
||||
z.NoChecksum = b>>2&1 == 0 |
||||
|
||||
bmsID := buf[1] >> 4 & 0x7 |
||||
if bmsID < 4 || bmsID > 7 { |
||||
return fmt.Errorf("lz4: invalid block max size ID: %d", bmsID) |
||||
} |
||||
bSize := blockSizeIndexToValue(bmsID - 4) |
||||
z.BlockMaxSize = bSize |
||||
|
||||
// Allocate the compressed/uncompressed buffers.
|
||||
// The compressed buffer cannot exceed the uncompressed one.
|
||||
if n := 2 * bSize; cap(z.zdata) < n { |
||||
z.zdata = make([]byte, n, n) |
||||
} |
||||
if debugFlag { |
||||
debug("header block max size id=%d size=%d", bmsID, bSize) |
||||
} |
||||
z.zdata = z.zdata[:bSize] |
||||
z.data = z.zdata[:cap(z.zdata)][bSize:] |
||||
z.idx = len(z.data) |
||||
|
||||
_, _ = z.checksum.Write(buf[0:2]) |
||||
|
||||
if frameSize { |
||||
buf := buf[:8] |
||||
if _, err := io.ReadFull(z.src, buf); err != nil { |
||||
return err |
||||
} |
||||
z.Size = binary.LittleEndian.Uint64(buf) |
||||
z.pos += 8 |
||||
_, _ = z.checksum.Write(buf) |
||||
} |
||||
|
||||
// Header checksum.
|
||||
if _, err := io.ReadFull(z.src, buf[:1]); err != nil { |
||||
return err |
||||
} |
||||
z.pos++ |
||||
if h := byte(z.checksum.Sum32() >> 8 & 0xFF); h != buf[0] { |
||||
return fmt.Errorf("lz4: invalid header checksum: got %x; expected %x", buf[0], h) |
||||
} |
||||
|
||||
z.Header.done = true |
||||
if debugFlag { |
||||
debug("header read: %v", z.Header) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Read decompresses data from the underlying source into the supplied buffer.
|
||||
//
|
||||
// Since there can be multiple streams concatenated, Header values may
|
||||
// change between calls to Read(). If that is the case, no data is actually read from
|
||||
// the underlying io.Reader, to allow for potential input buffer resizing.
|
||||
func (z *Reader) Read(buf []byte) (int, error) { |
||||
if debugFlag { |
||||
debug("Read buf len=%d", len(buf)) |
||||
} |
||||
if !z.Header.done { |
||||
if err := z.readHeader(true); err != nil { |
||||
return 0, err |
||||
} |
||||
if debugFlag { |
||||
debug("header read OK compressed buffer %d / %d uncompressed buffer %d : %d index=%d", |
||||
len(z.zdata), cap(z.zdata), len(z.data), cap(z.data), z.idx) |
||||
} |
||||
} |
||||
|
||||
if len(buf) == 0 { |
||||
return 0, nil |
||||
} |
||||
|
||||
if z.idx == len(z.data) { |
||||
// No data ready for reading, process the next block.
|
||||
if debugFlag { |
||||
debug("reading block from writer") |
||||
} |
||||
// Reset uncompressed buffer
|
||||
z.data = z.zdata[:cap(z.zdata)][len(z.zdata):] |
||||
|
||||
// Block length: 0 = end of frame, highest bit set: uncompressed.
|
||||
bLen, err := z.readUint32() |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
z.pos += 4 |
||||
|
||||
if bLen == 0 { |
||||
// End of frame reached.
|
||||
if !z.NoChecksum { |
||||
// Validate the frame checksum.
|
||||
checksum, err := z.readUint32() |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
if debugFlag { |
||||
debug("frame checksum got=%x / want=%x", z.checksum.Sum32(), checksum) |
||||
} |
||||
z.pos += 4 |
||||
if h := z.checksum.Sum32(); checksum != h { |
||||
return 0, fmt.Errorf("lz4: invalid frame checksum: got %x; expected %x", h, checksum) |
||||
} |
||||
} |
||||
|
||||
// Get ready for the next concatenated frame and keep the position.
|
||||
pos := z.pos |
||||
z.Reset(z.src) |
||||
z.pos = pos |
||||
|
||||
// Since multiple frames can be concatenated, check for more.
|
||||
return 0, z.readHeader(false) |
||||
} |
||||
|
||||
if debugFlag { |
||||
debug("raw block size %d", bLen) |
||||
} |
||||
if bLen&compressedBlockFlag > 0 { |
||||
// Uncompressed block.
|
||||
bLen &= compressedBlockMask |
||||
if debugFlag { |
||||
debug("uncompressed block size %d", bLen) |
||||
} |
||||
if int(bLen) > cap(z.data) { |
||||
return 0, fmt.Errorf("lz4: invalid block size: %d", bLen) |
||||
} |
||||
z.data = z.data[:bLen] |
||||
if _, err := io.ReadFull(z.src, z.data); err != nil { |
||||
return 0, err |
||||
} |
||||
z.pos += int64(bLen) |
||||
if z.OnBlockDone != nil { |
||||
z.OnBlockDone(int(bLen)) |
||||
} |
||||
|
||||
if z.BlockChecksum { |
||||
checksum, err := z.readUint32() |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
z.pos += 4 |
||||
|
||||
if h := xxh32.ChecksumZero(z.data); h != checksum { |
||||
return 0, fmt.Errorf("lz4: invalid block checksum: got %x; expected %x", h, checksum) |
||||
} |
||||
} |
||||
|
||||
} else { |
||||
// Compressed block.
|
||||
if debugFlag { |
||||
debug("compressed block size %d", bLen) |
||||
} |
||||
if int(bLen) > cap(z.data) { |
||||
return 0, fmt.Errorf("lz4: invalid block size: %d", bLen) |
||||
} |
||||
zdata := z.zdata[:bLen] |
||||
if _, err := io.ReadFull(z.src, zdata); err != nil { |
||||
return 0, err |
||||
} |
||||
z.pos += int64(bLen) |
||||
|
||||
if z.BlockChecksum { |
||||
checksum, err := z.readUint32() |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
z.pos += 4 |
||||
|
||||
if h := xxh32.ChecksumZero(zdata); h != checksum { |
||||
return 0, fmt.Errorf("lz4: invalid block checksum: got %x; expected %x", h, checksum) |
||||
} |
||||
} |
||||
|
||||
n, err := UncompressBlock(zdata, z.data) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
z.data = z.data[:n] |
||||
if z.OnBlockDone != nil { |
||||
z.OnBlockDone(n) |
||||
} |
||||
} |
||||
|
||||
if !z.NoChecksum { |
||||
_, _ = z.checksum.Write(z.data) |
||||
if debugFlag { |
||||
debug("current frame checksum %x", z.checksum.Sum32()) |
||||
} |
||||
} |
||||
z.idx = 0 |
||||
} |
||||
|
||||
if z.skip > int64(len(z.data[z.idx:])) { |
||||
z.skip -= int64(len(z.data[z.idx:])) |
||||
z.dpos += int64(len(z.data[z.idx:])) |
||||
z.idx = len(z.data) |
||||
return 0, nil |
||||
} |
||||
|
||||
z.idx += int(z.skip) |
||||
z.dpos += z.skip |
||||
z.skip = 0 |
||||
|
||||
n := copy(buf, z.data[z.idx:]) |
||||
z.idx += n |
||||
z.dpos += int64(n) |
||||
if debugFlag { |
||||
debug("copied %d bytes to input", n) |
||||
} |
||||
|
||||
return n, nil |
||||
} |
||||
|
||||
// Seek implements io.Seeker, but supports seeking forward from the current
|
||||
// position only. Any other seek will return an error. Allows skipping output
|
||||
// bytes which aren't needed, which in some scenarios is faster than reading
|
||||
// and discarding them.
|
||||
// Note this may cause future calls to Read() to read 0 bytes if all of the
|
||||
// data they would have returned is skipped.
|
||||
func (z *Reader) Seek(offset int64, whence int) (int64, error) { |
||||
if offset < 0 || whence != io.SeekCurrent { |
||||
return z.dpos + z.skip, ErrUnsupportedSeek |
||||
} |
||||
z.skip += offset |
||||
return z.dpos + z.skip, nil |
||||
} |
||||
|
||||
// Reset discards the Reader's state and makes it equivalent to the
|
||||
// result of its original state from NewReader, but reading from r instead.
|
||||
// This permits reusing a Reader rather than allocating a new one.
|
||||
func (z *Reader) Reset(r io.Reader) { |
||||
z.Header = Header{} |
||||
z.pos = 0 |
||||
z.src = r |
||||
z.zdata = z.zdata[:0] |
||||
z.data = z.data[:0] |
||||
z.idx = 0 |
||||
z.checksum.Reset() |
||||
} |
||||
|
||||
// readUint32 reads an uint32 into the supplied buffer.
|
||||
// The idea is to make use of the already allocated buffers avoiding additional allocations.
|
||||
func (z *Reader) readUint32() (uint32, error) { |
||||
buf := z.buf[:4] |
||||
_, err := io.ReadFull(z.src, buf) |
||||
x := binary.LittleEndian.Uint32(buf) |
||||
return x, err |
||||
} |
0
vendor/github.com/pierrec/lz4/LICENSE → vendor/github.com/pierrec/lz4/v4/LICENSE
generated
vendored
0
vendor/github.com/pierrec/lz4/LICENSE → vendor/github.com/pierrec/lz4/v4/LICENSE
generated
vendored
0
vendor/github.com/pierrec/lz4/README.md → vendor/github.com/pierrec/lz4/v4/README.md
generated
vendored
0
vendor/github.com/pierrec/lz4/README.md → vendor/github.com/pierrec/lz4/v4/README.md
generated
vendored
@ -0,0 +1,10 @@ |
||||
module github.com/pierrec/lz4/v4 |
||||
|
||||
go 1.14 |
||||
|
||||
require ( |
||||
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 |
||||
github.com/onsi/ginkgo v1.14.0 // indirect |
||||
github.com/pierrec/cmdflag v0.0.2 |
||||
github.com/schollz/progressbar/v3 v3.3.4 |
||||
) |
@ -0,0 +1,81 @@ |
||||
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48 h1:/EMHruHCFXR9xClkGV/t0rmHrdhX4+trQUcBqjwc9xE= |
||||
code.cloudfoundry.org/bytefmt v0.0.0-20200131002437-cf55d5288a48/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= |
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= |
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= |
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= |
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= |
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= |
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= |
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= |
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= |
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= |
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= |
||||
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= |
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= |
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= |
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= |
||||
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= |
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= |
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= |
||||
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= |
||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= |
||||
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= |
||||
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= |
||||
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= |
||||
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= |
||||
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= |
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= |
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= |
||||
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= |
||||
github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= |
||||
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= |
||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= |
||||
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= |
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= |
||||
github.com/pierrec/cmdflag v0.0.2 h1:ybjGJnPr/aURn2IKWjO49znx9N0DL6YfGsIxN0PYuVY= |
||||
github.com/pierrec/cmdflag v0.0.2/go.mod h1:a3zKGZ3cdQUfxjd0RGMLZr8xI3nvpJOB+m6o/1X5BmU= |
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= |
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= |
||||
github.com/schollz/progressbar/v3 v3.3.4 h1:nMinx+JaEm/zJz4cEyClQeAw5rsYSB5th3xv+5lV6Vg= |
||||
github.com/schollz/progressbar/v3 v3.3.4/go.mod h1:Rp5lZwpgtYmlvmGo1FyDwXMqagyRBQYSDwzlP9QDu84= |
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= |
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= |
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= |
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= |
||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0= |
||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= |
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
||||
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= |
||||
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= |
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= |
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= |
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= |
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= |
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= |
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= |
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= |
||||
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= |
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= |
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= |
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= |
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= |
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= |
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= |
||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= |
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= |
@ -0,0 +1,87 @@ |
||||
// Package lz4block provides LZ4 BlockSize types and pools of buffers.
|
||||
package lz4block |
||||
|
||||
import "sync" |
||||
|
||||
const ( |
||||
Block64Kb uint32 = 1 << (16 + iota*2) |
||||
Block256Kb |
||||
Block1Mb |
||||
Block4Mb |
||||
) |
||||
|
||||
var ( |
||||
BlockPool64K = sync.Pool{New: func() interface{} { return make([]byte, Block64Kb) }} |
||||
BlockPool256K = sync.Pool{New: func() interface{} { return make([]byte, Block256Kb) }} |
||||
BlockPool1M = sync.Pool{New: func() interface{} { return make([]byte, Block1Mb) }} |
||||
BlockPool4M = sync.Pool{New: func() interface{} { return make([]byte, Block4Mb) }} |
||||
) |
||||
|
||||
func Index(b uint32) BlockSizeIndex { |
||||
switch b { |
||||
case Block64Kb: |
||||
return 4 |
||||
case Block256Kb: |
||||
return 5 |
||||
case Block1Mb: |
||||
return 6 |
||||
case Block4Mb: |
||||
return 7 |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
func IsValid(b uint32) bool { |
||||
return Index(b) > 0 |
||||
} |
||||
|
||||
type BlockSizeIndex uint8 |
||||
|
||||
func (b BlockSizeIndex) IsValid() bool { |
||||
switch b { |
||||
case 4, 5, 6, 7: |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func (b BlockSizeIndex) Get() []byte { |
||||
var buf interface{} |
||||
switch b { |
||||
case 4: |
||||
buf = BlockPool64K.Get() |
||||
case 5: |
||||
buf = BlockPool256K.Get() |
||||
case 6: |
||||
buf = BlockPool1M.Get() |
||||
case 7: |
||||
buf = BlockPool4M.Get() |
||||
} |
||||
return buf.([]byte) |
||||
} |
||||
|
||||
func (b BlockSizeIndex) Put(buf []byte) { |
||||
// Safeguard: do not allow invalid buffers.
|
||||
switch c := uint32(cap(buf)); b { |
||||
case 4: |
||||
if c == Block64Kb { |
||||
BlockPool64K.Put(buf[:c]) |
||||
} |
||||
case 5: |
||||
if c == Block256Kb { |
||||
BlockPool256K.Put(buf[:c]) |
||||
} |
||||
case 6: |
||||
if c == Block1Mb { |
||||
BlockPool1M.Put(buf[:c]) |
||||
} |
||||
case 7: |
||||
if c == Block4Mb { |
||||
BlockPool4M.Put(buf[:c]) |
||||
} |
||||
} |
||||
} |
||||
|
||||
type CompressionLevel uint32 |
||||
|
||||
const Fast CompressionLevel = 0 |
@ -1,6 +1,6 @@ |
||||
// +build !amd64 appengine !gc noasm
|
||||
|
||||
package lz4 |
||||
package lz4block |
||||
|
||||
func decodeBlock(dst, src []byte) (ret int) { |
||||
const hasError = -2 |
@ -0,0 +1,19 @@ |
||||
package lz4errors |
||||
|
||||
type Error string |
||||
|
||||
func (e Error) Error() string { return string(e) } |
||||
|
||||
const ( |
||||
ErrInvalidSourceShortBuffer Error = "lz4: invalid source or destination buffer too short" |
||||
ErrInvalidFrame Error = "lz4: bad magic number" |
||||
ErrInternalUnhandledState Error = "lz4: unhandled state" |
||||
ErrInvalidHeaderChecksum Error = "lz4: invalid header checksum" |
||||
ErrInvalidBlockChecksum Error = "lz4: invalid block checksum" |
||||
ErrInvalidFrameChecksum Error = "lz4: invalid frame checksum" |
||||
ErrOptionInvalidCompressionLevel Error = "lz4: invalid compression level" |
||||
ErrOptionClosedOrError Error = "lz4: cannot apply options on closed or in error object" |
||||
ErrOptionInvalidBlockSize Error = "lz4: invalid block size" |
||||
ErrOptionNotApplicable Error = "lz4: option not applicable" |
||||
ErrWriterNotClosed Error = "lz4: writer not closed" |
||||
) |
@ -0,0 +1,377 @@ |
||||
// Package lz4stream provides the types that support reading and writing LZ4 data streams.
|
||||
package lz4stream |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
|
||||
"github.com/pierrec/lz4/v4/internal/lz4block" |
||||
"github.com/pierrec/lz4/v4/internal/lz4errors" |
||||
"github.com/pierrec/lz4/v4/internal/xxh32" |
||||
) |
||||
|
||||
//go:generate go run gen.go
|
||||
|
||||
const ( |
||||
frameMagic uint32 = 0x184D2204 |
||||
frameSkipMagic uint32 = 0x184D2A50 |
||||
) |
||||
|
||||
func NewFrame() *Frame { |
||||
return &Frame{} |
||||
} |
||||
|
||||
type Frame struct { |
||||
buf [15]byte // frame descriptor needs at most 4(magic)+4+8+1=11 bytes
|
||||
Magic uint32 |
||||
Descriptor FrameDescriptor |
||||
Blocks Blocks |
||||
Checksum uint32 |
||||
checksum xxh32.XXHZero |
||||
} |
||||
|
||||
// Reset allows reusing the Frame.
|
||||
// The Descriptor configuration is not modified.
|
||||
func (f *Frame) Reset(num int) { |
||||
f.Magic = 0 |
||||
f.Descriptor.Checksum = 0 |
||||
f.Descriptor.ContentSize = 0 |
||||
_ = f.Blocks.closeW(f, num) |
||||
f.Checksum = 0 |
||||
} |
||||
|
||||
func (f *Frame) InitW(dst io.Writer, num int) { |
||||
f.Magic = frameMagic |
||||
f.Descriptor.initW() |
||||
f.Blocks.initW(f, dst, num) |
||||
f.checksum.Reset() |
||||
} |
||||
|
||||
func (f *Frame) CloseW(dst io.Writer, num int) error { |
||||
if err := f.Blocks.closeW(f, num); err != nil { |
||||
return err |
||||
} |
||||
buf := f.buf[:0] |
||||
// End mark (data block size of uint32(0)).
|
||||
buf = append(buf, 0, 0, 0, 0) |
||||
if f.Descriptor.Flags.ContentChecksum() { |
||||
buf = f.checksum.Sum(buf) |
||||
} |
||||
_, err := dst.Write(buf) |
||||
return err |
||||
} |
||||
|
||||
func (f *Frame) InitR(src io.Reader) error { |
||||
if f.Magic > 0 { |
||||
// Header already read.
|
||||
return nil |
||||
} |
||||
|
||||
newFrame: |
||||
var err error |
||||
if f.Magic, err = f.readUint32(src); err != nil { |
||||
return err |
||||
} |
||||
switch m := f.Magic; { |
||||
case m == frameMagic: |
||||
// All 16 values of frameSkipMagic are valid.
|
||||
case m>>8 == frameSkipMagic>>8: |
||||
var skip uint32 |
||||
if err := binary.Read(src, binary.LittleEndian, &skip); err != nil { |
||||
return err |
||||
} |
||||
if _, err := io.CopyN(ioutil.Discard, src, int64(skip)); err != nil { |
||||
return err |
||||
} |
||||
goto newFrame |
||||
default: |
||||
return lz4errors.ErrInvalidFrame |
||||
} |
||||
if err := f.Descriptor.initR(f, src); err != nil { |
||||
return err |
||||
} |
||||
f.Blocks.initR(f) |
||||
f.checksum.Reset() |
||||
return nil |
||||
} |
||||
|
||||
func (f *Frame) CloseR(src io.Reader) (err error) { |
||||
if !f.Descriptor.Flags.ContentChecksum() { |
||||
return nil |
||||
} |
||||
if f.Checksum, err = f.readUint32(src); err != nil { |
||||
return err |
||||
} |
||||
if c := f.checksum.Sum32(); c != f.Checksum { |
||||
return fmt.Errorf("%w: got %x; expected %x", lz4errors.ErrInvalidFrameChecksum, c, f.Checksum) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type FrameDescriptor struct { |
||||
Flags DescriptorFlags |
||||
ContentSize uint64 |
||||
Checksum uint8 |
||||
} |
||||
|
||||
func (fd *FrameDescriptor) initW() { |
||||
fd.Flags.VersionSet(1) |
||||
fd.Flags.BlockIndependenceSet(true) |
||||
} |
||||
|
||||
func (fd *FrameDescriptor) Write(f *Frame, dst io.Writer) error { |
||||
if fd.Checksum > 0 { |
||||
// Header already written.
|
||||
return nil |
||||
} |
||||
|
||||
buf := f.buf[:4+2] |
||||
// Write the magic number here even though it belongs to the Frame.
|
||||
binary.LittleEndian.PutUint32(buf, f.Magic) |
||||
binary.LittleEndian.PutUint16(buf[4:], uint16(fd.Flags)) |
||||
|
||||
if fd.Flags.Size() { |
||||
buf = buf[:4+2+8] |
||||
binary.LittleEndian.PutUint64(buf[4+2:], fd.ContentSize) |
||||
} |
||||
fd.Checksum = descriptorChecksum(buf[4:]) |
||||
buf = append(buf, fd.Checksum) |
||||
|
||||
_, err := dst.Write(buf) |
||||
return err |
||||
} |
||||
|
||||
func (fd *FrameDescriptor) initR(f *Frame, src io.Reader) error { |
||||
// Read the flags and the checksum, hoping that there is not content size.
|
||||
buf := f.buf[:3] |
||||
if _, err := io.ReadFull(src, buf); err != nil { |
||||
return err |
||||
} |
||||
descr := binary.LittleEndian.Uint16(buf) |
||||
fd.Flags = DescriptorFlags(descr) |
||||
if fd.Flags.Size() { |
||||
// Append the 8 missing bytes.
|
||||
buf = buf[:3+8] |
||||
if _, err := io.ReadFull(src, buf[3:]); err != nil { |
||||
return err |
||||
} |
||||
fd.ContentSize = binary.LittleEndian.Uint64(buf[2:]) |
||||
} |
||||
fd.Checksum = buf[len(buf)-1] // the checksum is the last byte
|
||||
buf = buf[:len(buf)-1] // all descriptor fields except checksum
|
||||
if c := descriptorChecksum(buf); fd.Checksum != c { |
||||
return fmt.Errorf("%w: got %x; expected %x", lz4errors.ErrInvalidHeaderChecksum, c, fd.Checksum) |
||||
} |
||||
// Validate the elements that can be.
|
||||
if idx := fd.Flags.BlockSizeIndex(); !idx.IsValid() { |
||||
return lz4errors.ErrOptionInvalidBlockSize |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func descriptorChecksum(buf []byte) byte { |
||||
return byte(xxh32.ChecksumZero(buf) >> 8) |
||||
} |
||||
|
||||
type Blocks struct { |
||||
Block *FrameDataBlock |
||||
Blocks chan chan *FrameDataBlock |
||||
err error |
||||
} |
||||
|
||||
func (b *Blocks) initW(f *Frame, dst io.Writer, num int) { |
||||
size := f.Descriptor.Flags.BlockSizeIndex() |
||||
if num == 1 { |
||||
b.Blocks = nil |
||||
b.Block = NewFrameDataBlock(size) |
||||
return |
||||
} |
||||
b.Block = nil |
||||
if cap(b.Blocks) != num { |
||||
b.Blocks = make(chan chan *FrameDataBlock, num) |
||||
} |
||||
// goroutine managing concurrent block compression goroutines.
|
||||
go func() { |
||||
// Process next block compression item.
|
||||
for c := range b.Blocks { |
||||
// Read the next compressed block result.
|
||||
// Waiting here ensures that the blocks are output in the order they were sent.
|
||||
// The incoming channel is always closed as it indicates to the caller that
|
||||
// the block has been processed.
|
||||
block := <-c |
||||
if block == nil { |
||||
// Notify the block compression routine that we are done with its result.
|
||||
// This is used when a sentinel block is sent to terminate the compression.
|
||||
close(c) |
||||
return |
||||
} |
||||
// Do not attempt to write the block upon any previous failure.
|
||||
if b.err == nil { |
||||
// Write the block.
|
||||
if err := block.Write(f, dst); err != nil && b.err == nil { |
||||
// Keep the first error.
|
||||
b.err = err |
||||
// All pending compression goroutines need to shut down, so we need to keep going.
|
||||
} |
||||
} |
||||
close(c) |
||||
} |
||||
}() |
||||
} |
||||
|
||||
func (b *Blocks) closeW(f *Frame, num int) error { |
||||
if num == 1 { |
||||
if b.Block == nil { |
||||
// Not initialized yet.
|
||||
return nil |
||||
} |
||||
b.Block.CloseW(f) |
||||
return nil |
||||
} |
||||
if b.Blocks == nil { |
||||
// Not initialized yet.
|
||||
return nil |
||||
} |
||||
c := make(chan *FrameDataBlock) |
||||
b.Blocks <- c |
||||
c <- nil |
||||
<-c |
||||
err := b.err |
||||
b.err = nil |
||||
return err |
||||
} |
||||
|
||||
func (b *Blocks) initR(f *Frame) { |
||||
size := f.Descriptor.Flags.BlockSizeIndex() |
||||
b.Block = NewFrameDataBlock(size) |
||||
} |
||||
|
||||
func NewFrameDataBlock(size lz4block.BlockSizeIndex) *FrameDataBlock { |
||||
buf := size.Get() |
||||
return &FrameDataBlock{Data: buf, data: buf} |
||||
} |
||||
|
||||
type FrameDataBlock struct { |
||||
Size DataBlockSize |
||||
Data []byte // compressed or uncompressed data (.data or .src)
|
||||
Checksum uint32 |
||||
data []byte // buffer for compressed data
|
||||
src []byte // uncompressed data
|
||||
} |
||||
|
||||
func (b *FrameDataBlock) CloseW(f *Frame) { |
||||
if b.data != nil { |
||||
// Block was not already closed.
|
||||
size := f.Descriptor.Flags.BlockSizeIndex() |
||||
size.Put(b.data) |
||||
b.Data = nil |
||||
b.data = nil |
||||
b.src = nil |
||||
} |
||||
} |
||||
|
||||
// Block compression errors are ignored since the buffer is sized appropriately.
|
||||
func (b *FrameDataBlock) Compress(f *Frame, src []byte, level lz4block.CompressionLevel) *FrameDataBlock { |
||||
data := b.data[:len(src)] // trigger the incompressible flag in CompressBlock
|
||||
var n int |
||||
switch level { |
||||
case lz4block.Fast: |
||||
n, _ = lz4block.CompressBlock(src, data, nil) |
||||
default: |
||||
n, _ = lz4block.CompressBlockHC(src, data, level, nil, nil) |
||||
} |
||||
if n == 0 { |
||||
b.Size.UncompressedSet(true) |
||||
b.Data = src |
||||
} else { |
||||
b.Size.UncompressedSet(false) |
||||
b.Data = data[:n] |
||||
} |
||||
b.Size.sizeSet(len(b.Data)) |
||||
b.src = src // keep track of the source for content checksum
|
||||
|
||||
if f.Descriptor.Flags.BlockChecksum() { |
||||
b.Checksum = xxh32.ChecksumZero(src) |
||||
} |
||||
return b |
||||
} |
||||
|
||||
func (b *FrameDataBlock) Write(f *Frame, dst io.Writer) error { |
||||
if f.Descriptor.Flags.ContentChecksum() { |
||||
_, _ = f.checksum.Write(b.src) |
||||
} |
||||
buf := f.buf[:] |
||||
binary.LittleEndian.PutUint32(buf, uint32(b.Size)) |
||||
if _, err := dst.Write(buf[:4]); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if _, err := dst.Write(b.Data); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if b.Checksum == 0 { |
||||
return nil |
||||
} |
||||
binary.LittleEndian.PutUint32(buf, b.Checksum) |
||||
_, err := dst.Write(buf[:4]) |
||||
return err |
||||
} |
||||
|
||||
func (b *FrameDataBlock) Uncompress(f *Frame, src io.Reader, dst []byte) (int, error) { |
||||
x, err := f.readUint32(src) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
b.Size = DataBlockSize(x) |
||||
if b.Size == 0 { |
||||
// End of frame reached.
|
||||
return 0, io.EOF |
||||
} |
||||
|
||||
isCompressed := !b.Size.Uncompressed() |
||||
size := b.Size.size() |
||||
var data []byte |
||||
if isCompressed { |
||||
// Data is first copied into b.Data and then it will get uncompressed into dst.
|
||||
data = b.Data |
||||
} else { |
||||
// Data is directly copied into dst as it is not compressed.
|
||||
data = dst |
||||
} |
||||
data = data[:size] |
||||
if _, err := io.ReadFull(src, data); err != nil { |
||||
return 0, err |
||||
} |
||||
if isCompressed { |
||||
n, err := lz4block.UncompressBlock(data, dst) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
data = dst[:n] |
||||
} |
||||
|
||||
if f.Descriptor.Flags.BlockChecksum() { |
||||
var err error |
||||
if b.Checksum, err = f.readUint32(src); err != nil { |
||||
return 0, err |
||||
} |
||||
if c := xxh32.ChecksumZero(data); c != b.Checksum { |
||||
return 0, fmt.Errorf("%w: got %x; expected %x", lz4errors.ErrInvalidBlockChecksum, c, b.Checksum) |
||||
} |
||||
} |
||||
if f.Descriptor.Flags.ContentChecksum() { |
||||
_, _ = f.checksum.Write(data) |
||||
} |
||||
return len(data), nil |
||||
} |
||||
|
||||
func (f *Frame) readUint32(r io.Reader) (x uint32, err error) { |
||||
if _, err = io.ReadFull(r, f.buf[:4]); err != nil { |
||||
return |
||||
} |
||||
x = binary.LittleEndian.Uint32(f.buf[:4]) |
||||
return |
||||
} |
@ -0,0 +1,103 @@ |
||||
// Code generated by `gen.exe`. DO NOT EDIT.
|
||||
|
||||
package lz4stream |
||||
|
||||
import "github.com/pierrec/lz4/v4/internal/lz4block" |
||||
|
||||
// DescriptorFlags is defined as follow:
|
||||
// field bits
|
||||
// ----- ----
|
||||
// _ 2
|
||||
// ContentChecksum 1
|
||||
// Size 1
|
||||
// BlockChecksum 1
|
||||
// BlockIndependence 1
|
||||
// Version 2
|
||||
// _ 4
|
||||
// BlockSizeIndex 3
|
||||
// _ 1
|
||||
type DescriptorFlags uint16 |
||||
|
||||
// Getters.
|
||||
func (x DescriptorFlags) ContentChecksum() bool { return x>>2&1 != 0 } |
||||
func (x DescriptorFlags) Size() bool { return x>>3&1 != 0 } |
||||
func (x DescriptorFlags) BlockChecksum() bool { return x>>4&1 != 0 } |
||||
func (x DescriptorFlags) BlockIndependence() bool { return x>>5&1 != 0 } |
||||
func (x DescriptorFlags) Version() uint16 { return uint16(x >> 6 & 0x3) } |
||||
func (x DescriptorFlags) BlockSizeIndex() lz4block.BlockSizeIndex { |
||||
return lz4block.BlockSizeIndex(x >> 12 & 0x7) |
||||
} |
||||
|
||||
// Setters.
|
||||
func (x *DescriptorFlags) ContentChecksumSet(v bool) *DescriptorFlags { |
||||
const b = 1 << 2 |
||||
if v { |
||||
*x = *x&^b | b |
||||
} else { |
||||
*x &^= b |
||||
} |
||||
return x |
||||
} |
||||
func (x *DescriptorFlags) SizeSet(v bool) *DescriptorFlags { |
||||
const b = 1 << 3 |
||||
if v { |
||||
*x = *x&^b | b |
||||
} else { |
||||
*x &^= b |
||||
} |
||||
return x |
||||
} |
||||
func (x *DescriptorFlags) BlockChecksumSet(v bool) *DescriptorFlags { |
||||
const b = 1 << 4 |
||||
if v { |
||||
*x = *x&^b | b |
||||
} else { |
||||
*x &^= b |
||||
} |
||||
return x |
||||
} |
||||
func (x *DescriptorFlags) BlockIndependenceSet(v bool) *DescriptorFlags { |
||||
const b = 1 << 5 |
||||
if v { |
||||
*x = *x&^b | b |
||||
} else { |
||||
*x &^= b |
||||
} |
||||
return x |
||||
} |
||||
func (x *DescriptorFlags) VersionSet(v uint16) *DescriptorFlags { |
||||
*x = *x&^(0x3<<6) | (DescriptorFlags(v) & 0x3 << 6) |
||||
return x |
||||
} |
||||
func (x *DescriptorFlags) BlockSizeIndexSet(v lz4block.BlockSizeIndex) *DescriptorFlags { |
||||
*x = *x&^(0x7<<12) | (DescriptorFlags(v) & 0x7 << 12) |
||||
return x |
||||
} |
||||
|
||||
// Code generated by `gen.exe`. DO NOT EDIT.
|
||||
|
||||
// DataBlockSize is defined as follow:
|
||||
// field bits
|
||||
// ----- ----
|
||||
// size 31
|
||||
// Uncompressed 1
|
||||
type DataBlockSize uint32 |
||||
|
||||
// Getters.
|
||||
func (x DataBlockSize) size() int { return int(x & 0x7FFFFFFF) } |
||||
func (x DataBlockSize) Uncompressed() bool { return x>>31&1 != 0 } |
||||
|
||||
// Setters.
|
||||
func (x *DataBlockSize) sizeSet(v int) *DataBlockSize { |
||||
*x = *x&^0x7FFFFFFF | DataBlockSize(v)&0x7FFFFFFF |
||||
return x |
||||
} |
||||
func (x *DataBlockSize) UncompressedSet(v bool) *DataBlockSize { |
||||
const b = 1 << 31 |
||||
if v { |
||||
*x = *x&^b | b |
||||
} else { |
||||
*x &^= b |
||||
} |
||||
return x |
||||
} |
@ -0,0 +1,99 @@ |
||||
// Package lz4 implements reading and writing lz4 compressed data (a frame),
|
||||
// as specified in http://fastcompression.blogspot.fr/2013/04/lz4-streaming-format-final.html.
|
||||
//
|
||||
// Although the block level compression and decompression functions are exposed and are fully compatible
|
||||
// with the lz4 block format definition, they are low level and should not be used directly.
|
||||
// For a complete description of an lz4 compressed block, see:
|
||||
// http://fastcompression.blogspot.fr/2011/05/lz4-explained.html
|
||||
//
|
||||
// See https://github.com/lz4/lz4 for the reference C implementation.
|
||||
//
|
||||
package lz4 |
||||
|
||||
import ( |
||||
"github.com/pierrec/lz4/v4/internal/lz4block" |
||||
"github.com/pierrec/lz4/v4/internal/lz4errors" |
||||
) |
||||
|
||||
func _() { |
||||
// Safety checks for duplicated elements.
|
||||
var x [1]struct{} |
||||
_ = x[lz4block.CompressionLevel(Fast)-lz4block.Fast] |
||||
_ = x[Block64Kb-BlockSize(lz4block.Block64Kb)] |
||||
_ = x[Block256Kb-BlockSize(lz4block.Block256Kb)] |
||||
_ = x[Block1Mb-BlockSize(lz4block.Block1Mb)] |
||||
_ = x[Block4Mb-BlockSize(lz4block.Block4Mb)] |
||||
} |
||||
|
||||
// CompressBlockBound returns the maximum size of a given buffer of size n, when not compressible.
|
||||
func CompressBlockBound(n int) int { |
||||
return lz4block.CompressBlockBound(n) |
||||
} |
||||
|
||||
// UncompressBlock uncompresses the source buffer into the destination one,
|
||||
// and returns the uncompressed size.
|
||||
//
|
||||
// The destination buffer must be sized appropriately.
|
||||
//
|
||||
// An error is returned if the source data is invalid or the destination buffer is too small.
|
||||
func UncompressBlock(src, dst []byte) (int, error) { |
||||
return lz4block.UncompressBlock(src, dst) |
||||
} |
||||
|
||||
// CompressBlock compresses the source buffer into the destination one.
|
||||
// This is the fast version of LZ4 compression and also the default one.
|
||||
//
|
||||
// The argument hashTable is scratch space for a hash table used by the
|
||||
// compressor. If provided, it should have length at least 1<<16. If it is
|
||||
// shorter (or nil), CompressBlock allocates its own hash table.
|
||||
//
|
||||
// The size of the compressed data is returned.
|
||||
//
|
||||
// If the destination buffer size is lower than CompressBlockBound and
|
||||
// the compressed size is 0 and no error, then the data is incompressible.
|
||||
//
|
||||
// An error is returned if the destination buffer is too small.
|
||||
func CompressBlock(src, dst []byte, hashTable []int) (int, error) { |
||||
return lz4block.CompressBlock(src, dst, hashTable) |
||||
} |
||||
|
||||
// CompressBlockHC compresses the source buffer src into the destination dst
|
||||
// with max search depth (use 0 or negative value for no max).
|
||||
//
|
||||
// CompressBlockHC compression ratio is better than CompressBlock but it is also slower.
|
||||
//
|
||||
// The size of the compressed data is returned.
|
||||
//
|
||||
// If the destination buffer size is lower than CompressBlockBound and
|
||||
// the compressed size is 0 and no error, then the data is incompressible.
|
||||
//
|
||||
// An error is returned if the destination buffer is too small.
|
||||
func CompressBlockHC(src, dst []byte, depth CompressionLevel, hashTable, chainTable []int) (int, error) { |
||||
return lz4block.CompressBlockHC(src, dst, lz4block.CompressionLevel(depth), hashTable, chainTable) |
||||
} |
||||
|
||||
const ( |
||||
// ErrInvalidSourceShortBuffer is returned by UncompressBlock or CompressBLock when a compressed
|
||||
// block is corrupted or the destination buffer is not large enough for the uncompressed data.
|
||||
ErrInvalidSourceShortBuffer = lz4errors.ErrInvalidSourceShortBuffer |
||||
// ErrInvalidFrame is returned when reading an invalid LZ4 archive.
|
||||
ErrInvalidFrame = lz4errors.ErrInvalidFrame |
||||
// ErrInternalUnhandledState is an internal error.
|
||||
ErrInternalUnhandledState = lz4errors.ErrInternalUnhandledState |
||||
// ErrInvalidHeaderChecksum is returned when reading a frame.
|
||||
ErrInvalidHeaderChecksum = lz4errors.ErrInvalidHeaderChecksum |
||||
// ErrInvalidBlockChecksum is returned when reading a frame.
|
||||
ErrInvalidBlockChecksum = lz4errors.ErrInvalidBlockChecksum |
||||
// ErrInvalidFrameChecksum is returned when reading a frame.
|
||||
ErrInvalidFrameChecksum = lz4errors.ErrInvalidFrameChecksum |
||||
// ErrOptionInvalidCompressionLevel is returned when the supplied compression level is invalid.
|
||||
ErrOptionInvalidCompressionLevel = lz4errors.ErrOptionInvalidCompressionLevel |
||||
// ErrOptionClosedOrError is returned when an option is applied to a closed or in error object.
|
||||
ErrOptionClosedOrError = lz4errors.ErrOptionClosedOrError |
||||
// ErrOptionInvalidBlockSize is returned when
|
||||
ErrOptionInvalidBlockSize = lz4errors.ErrOptionInvalidBlockSize |
||||
// ErrOptionNotApplicable is returned when trying to apply an option to an object not supporting it.
|
||||
ErrOptionNotApplicable = lz4errors.ErrOptionNotApplicable |
||||
// ErrWriterNotClosed is returned when attempting to reset an unclosed writer.
|
||||
ErrWriterNotClosed = lz4errors.ErrWriterNotClosed |
||||
) |
@ -0,0 +1,191 @@ |
||||
package lz4 |
||||
|
||||
import ( |
||||
"fmt" |
||||
"reflect" |
||||
"runtime" |
||||
|
||||
"github.com/pierrec/lz4/v4/internal/lz4block" |
||||
"github.com/pierrec/lz4/v4/internal/lz4errors" |
||||
) |
||||
|
||||
//go:generate go run golang.org/x/tools/cmd/stringer -type=BlockSize,CompressionLevel -output options_gen.go
|
||||
|
||||
type ( |
||||
applier interface { |
||||
Apply(...Option) error |
||||
private() |
||||
} |
||||
// Option defines the parameters to setup an LZ4 Writer or Reader.
|
||||
Option func(applier) error |
||||
) |
||||
|
||||
// String returns a string representation of the option with its parameter(s).
|
||||
func (o Option) String() string { |
||||
return o(nil).Error() |
||||
} |
||||
|
||||
// Default options.
|
||||
var ( |
||||
DefaultBlockSizeOption = BlockSizeOption(Block4Mb) |
||||
DefaultChecksumOption = ChecksumOption(true) |
||||
DefaultConcurrency = ConcurrencyOption(1) |
||||
defaultOnBlockDone = OnBlockDoneOption(nil) |
||||
) |
||||
|
||||
const ( |
||||
Block64Kb BlockSize = 1 << (16 + iota*2) |
||||
Block256Kb |
||||
Block1Mb |
||||
Block4Mb |
||||
) |
||||
|
||||
// BlockSizeIndex defines the size of the blocks to be compressed.
|
||||
type BlockSize uint32 |
||||
|
||||
// BlockSizeOption defines the maximum size of compressed blocks (default=Block4Mb).
|
||||
func BlockSizeOption(size BlockSize) Option { |
||||
return func(a applier) error { |
||||
switch w := a.(type) { |
||||
case nil: |
||||
s := fmt.Sprintf("BlockSizeOption(%s)", size) |
||||
return lz4errors.Error(s) |
||||
case *Writer: |
||||
size := uint32(size) |
||||
if !lz4block.IsValid(size) { |
||||
return fmt.Errorf("%w: %d", lz4errors.ErrOptionInvalidBlockSize, size) |
||||
} |
||||
w.frame.Descriptor.Flags.BlockSizeIndexSet(lz4block.Index(size)) |
||||
return nil |
||||
} |
||||
return lz4errors.ErrOptionNotApplicable |
||||
} |
||||
} |
||||
|
||||
// BlockChecksumOption enables or disables block checksum (default=false).
|
||||
func BlockChecksumOption(flag bool) Option { |
||||
return func(a applier) error { |
||||
switch w := a.(type) { |
||||
case nil: |
||||
s := fmt.Sprintf("BlockChecksumOption(%v)", flag) |
||||
return lz4errors.Error(s) |
||||
case *Writer: |
||||
w.frame.Descriptor.Flags.BlockChecksumSet(flag) |
||||
return nil |
||||
} |
||||
return lz4errors.ErrOptionNotApplicable |
||||
} |
||||
} |
||||
|
||||
// ChecksumOption enables/disables all blocks or content checksum (default=true).
|
||||
func ChecksumOption(flag bool) Option { |
||||
return func(a applier) error { |
||||
switch w := a.(type) { |
||||
case nil: |
||||
s := fmt.Sprintf("BlockChecksumOption(%v)", flag) |
||||
return lz4errors.Error(s) |
||||
case *Writer: |
||||
w.frame.Descriptor.Flags.ContentChecksumSet(flag) |
||||
return nil |
||||
} |
||||
return lz4errors.ErrOptionNotApplicable |
||||
} |
||||
} |
||||
|
||||
// SizeOption sets the size of the original uncompressed data (default=0). It is useful to know the size of the
|
||||
// whole uncompressed data stream.
|
||||
func SizeOption(size uint64) Option { |
||||
return func(a applier) error { |
||||
switch w := a.(type) { |
||||
case nil: |
||||
s := fmt.Sprintf("SizeOption(%d)", size) |
||||
return lz4errors.Error(s) |
||||
case *Writer: |
||||
w.frame.Descriptor.Flags.SizeSet(size > 0) |
||||
w.frame.Descriptor.ContentSize = size |
||||
return nil |
||||
} |
||||
return lz4errors.ErrOptionNotApplicable |
||||
} |
||||
} |
||||
|
||||
// ConcurrencyOption sets the number of go routines used for compression.
|
||||
// If n<0, then the output of runtime.GOMAXPROCS(0) is used.
|
||||
func ConcurrencyOption(n int) Option { |
||||
return func(a applier) error { |
||||
switch w := a.(type) { |
||||
case nil: |
||||
s := fmt.Sprintf("ConcurrencyOption(%d)", n) |
||||
return lz4errors.Error(s) |
||||
case *Writer: |
||||
switch n { |
||||
case 0, 1: |
||||
default: |
||||
if n < 0 { |
||||
n = runtime.GOMAXPROCS(0) |
||||
} |
||||
} |
||||
w.num = n |
||||
return nil |
||||
} |
||||
return lz4errors.ErrOptionNotApplicable |
||||
} |
||||
} |
||||
|
||||
// CompressionLevel defines the level of compression to use. The higher the better, but slower, compression.
|
||||
type CompressionLevel uint32 |
||||
|
||||
const ( |
||||
Fast CompressionLevel = 0 |
||||
Level1 CompressionLevel = 1 << (8 + iota) |
||||
Level2 |
||||
Level3 |
||||
Level4 |
||||
Level5 |
||||
Level6 |
||||
Level7 |
||||
Level8 |
||||
Level9 |
||||
) |
||||
|
||||
// CompressionLevelOption defines the compression level (default=Fast).
|
||||
func CompressionLevelOption(level CompressionLevel) Option { |
||||
return func(a applier) error { |
||||
switch w := a.(type) { |
||||
case nil: |
||||
s := fmt.Sprintf("CompressionLevelOption(%s)", level) |
||||
return lz4errors.Error(s) |
||||
case *Writer: |
||||
switch level { |
||||
case Fast, Level1, Level2, Level3, Level4, Level5, Level6, Level7, Level8, Level9: |
||||
default: |
||||
return fmt.Errorf("%w: %d", lz4errors.ErrOptionInvalidCompressionLevel, level) |
||||
} |
||||
w.level = lz4block.CompressionLevel(level) |
||||
return nil |
||||
} |
||||
return lz4errors.ErrOptionNotApplicable |
||||
} |
||||
} |
||||
|
||||
func onBlockDone(int) {} |
||||
|
||||
// OnBlockDoneOption is triggered when a block has been processed. For a Writer, it is when is has been compressed,
|
||||
// for a Reader, it is when it has been uncompressed.
|
||||
func OnBlockDoneOption(handler func(size int)) Option { |
||||
if handler == nil { |
||||
handler = onBlockDone |
||||
} |
||||
return func(a applier) error { |
||||
switch rw := a.(type) { |
||||
case nil: |
||||
s := fmt.Sprintf("OnBlockDoneOption(%s)", reflect.TypeOf(handler).String()) |
||||
return lz4errors.Error(s) |
||||
case *Writer: |
||||
rw.handler = handler |
||||
case *Reader: |
||||
rw.handler = handler |
||||
} |
||||
return nil |
||||
} |
||||
} |
@ -0,0 +1,92 @@ |
||||
// Code generated by "stringer -type=BlockSize,CompressionLevel -output options_gen.go"; DO NOT EDIT.
|
||||
|
||||
package lz4 |
||||
|
||||
import "strconv" |
||||
|
||||
func _() { |
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{} |
||||
_ = x[Block64Kb-65536] |
||||
_ = x[Block256Kb-262144] |
||||
_ = x[Block1Mb-1048576] |
||||
_ = x[Block4Mb-4194304] |
||||
} |
||||
|
||||
const ( |
||||
_BlockSize_name_0 = "Block64Kb" |
||||
_BlockSize_name_1 = "Block256Kb" |
||||
_BlockSize_name_2 = "Block1Mb" |
||||
_BlockSize_name_3 = "Block4Mb" |
||||
) |
||||
|
||||
func (i BlockSize) String() string { |
||||
switch { |
||||
case i == 65536: |
||||
return _BlockSize_name_0 |
||||
case i == 262144: |
||||
return _BlockSize_name_1 |
||||
case i == 1048576: |
||||
return _BlockSize_name_2 |
||||
case i == 4194304: |
||||
return _BlockSize_name_3 |
||||
default: |
||||
return "BlockSize(" + strconv.FormatInt(int64(i), 10) + ")" |
||||
} |
||||
} |
||||
func _() { |
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{} |
||||
_ = x[Fast-0] |
||||
_ = x[Level1-512] |
||||
_ = x[Level2-1024] |
||||
_ = x[Level3-2048] |
||||
_ = x[Level4-4096] |
||||
_ = x[Level5-8192] |
||||
_ = x[Level6-16384] |
||||
_ = x[Level7-32768] |
||||
_ = x[Level8-65536] |
||||
_ = x[Level9-131072] |
||||
} |
||||
|
||||
const ( |
||||
_CompressionLevel_name_0 = "Fast" |
||||
_CompressionLevel_name_1 = "Level1" |
||||
_CompressionLevel_name_2 = "Level2" |
||||
_CompressionLevel_name_3 = "Level3" |
||||
_CompressionLevel_name_4 = "Level4" |
||||
_CompressionLevel_name_5 = "Level5" |
||||
_CompressionLevel_name_6 = "Level6" |
||||
_CompressionLevel_name_7 = "Level7" |
||||
_CompressionLevel_name_8 = "Level8" |
||||
_CompressionLevel_name_9 = "Level9" |
||||
) |
||||
|
||||
func (i CompressionLevel) String() string { |
||||
switch { |
||||
case i == 0: |
||||
return _CompressionLevel_name_0 |
||||
case i == 512: |
||||
return _CompressionLevel_name_1 |
||||
case i == 1024: |
||||
return _CompressionLevel_name_2 |
||||
case i == 2048: |
||||
return _CompressionLevel_name_3 |
||||
case i == 4096: |
||||
return _CompressionLevel_name_4 |
||||
case i == 8192: |
||||
return _CompressionLevel_name_5 |
||||
case i == 16384: |
||||
return _CompressionLevel_name_6 |
||||
case i == 32768: |
||||
return _CompressionLevel_name_7 |
||||
case i == 65536: |
||||
return _CompressionLevel_name_8 |
||||
case i == 131072: |
||||
return _CompressionLevel_name_9 |
||||
default: |
||||
return "CompressionLevel(" + strconv.FormatInt(int64(i), 10) + ")" |
||||
} |
||||
} |
@ -0,0 +1,191 @@ |
||||
package lz4 |
||||
|
||||
import ( |
||||
"io" |
||||
|
||||
"github.com/pierrec/lz4/v4/internal/lz4errors" |
||||
"github.com/pierrec/lz4/v4/internal/lz4stream" |
||||
) |
||||
|
||||
var readerStates = []aState{ |
||||
noState: newState, |
||||
errorState: newState, |
||||
newState: readState, |
||||
readState: closedState, |
||||
closedState: newState, |
||||
} |
||||
|
||||
// NewReader returns a new LZ4 frame decoder.
|
||||
func NewReader(r io.Reader) *Reader { |
||||
zr := &Reader{frame: lz4stream.NewFrame()} |
||||
zr.state.init(readerStates) |
||||
_ = zr.Apply(defaultOnBlockDone) |
||||
zr.Reset(r) |
||||
return zr |
||||
} |
||||
|
||||
// Reader allows reading an LZ4 stream.
|
||||
type Reader struct { |
||||
state _State |
||||
src io.Reader // source reader
|
||||
frame *lz4stream.Frame // frame being read
|
||||
data []byte // pending data
|
||||
idx int // size of pending data
|
||||
handler func(int) |
||||
} |
||||
|
||||
func (*Reader) private() {} |
||||
|
||||
func (r *Reader) Apply(options ...Option) (err error) { |
||||
defer r.state.check(&err) |
||||
switch r.state.state { |
||||
case newState: |
||||
case errorState: |
||||
return r.state.err |
||||
default: |
||||
return lz4errors.ErrOptionClosedOrError |
||||
} |
||||
for _, o := range options { |
||||
if err = o(r); err != nil { |
||||
return |
||||
} |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Size returns the size of the underlying uncompressed data, if set in the stream.
|
||||
func (r *Reader) Size() int { |
||||
switch r.state.state { |
||||
case readState, closedState: |
||||
if r.frame.Descriptor.Flags.Size() { |
||||
return int(r.frame.Descriptor.ContentSize) |
||||
} |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
func (r *Reader) init() error { |
||||
return r.frame.InitR(r.src) |
||||
} |
||||
|
||||
func (r *Reader) Read(buf []byte) (n int, err error) { |
||||
defer r.state.check(&err) |
||||
switch r.state.state { |
||||
case readState: |
||||
case closedState, errorState: |
||||
return 0, r.state.err |
||||
case newState: |
||||
// First initialization.
|
||||
if err = r.init(); r.state.next(err) { |
||||
return |
||||
} |
||||
size := r.frame.Descriptor.Flags.BlockSizeIndex() |
||||
r.data = size.Get() |
||||
default: |
||||
return 0, r.state.fail() |
||||
} |
||||
if len(buf) == 0 { |
||||
return |
||||
} |
||||
|
||||
var bn int |
||||
if r.idx > 0 { |
||||
// Some left over data, use it.
|
||||
goto fillbuf |
||||
} |
||||
// No uncompressed data yet.
|
||||
r.data = r.data[:cap(r.data)] |
||||
for len(buf) >= len(r.data) { |
||||
// Input buffer large enough and no pending data: uncompress directly into it.
|
||||
switch bn, err = r.frame.Blocks.Block.Uncompress(r.frame, r.src, buf); err { |
||||
case nil: |
||||
r.handler(bn) |
||||
n += bn |
||||
buf = buf[bn:] |
||||
case io.EOF: |
||||
goto close |
||||
default: |
||||
return |
||||
} |
||||
} |
||||
if n > 0 { |
||||
// Some data was read, done for now.
|
||||
return |
||||
} |
||||
// Read the next block.
|
||||
switch bn, err = r.frame.Blocks.Block.Uncompress(r.frame, r.src, r.data); err { |
||||
case nil: |
||||
r.handler(bn) |
||||
r.data = r.data[:bn] |
||||
goto fillbuf |
||||
case io.EOF: |
||||
default: |
||||
return |
||||
} |
||||
close: |
||||
if er := r.frame.CloseR(r.src); er != nil { |
||||
err = er |
||||
} |
||||
r.frame.Descriptor.Flags.BlockSizeIndex().Put(r.data) |
||||
r.Reset(nil) |
||||
return |
||||
fillbuf: |
||||
bn = copy(buf, r.data[r.idx:]) |
||||
n += bn |
||||
r.idx += bn |
||||
if r.idx == len(r.data) { |
||||
// All data read, get ready for the next Read.
|
||||
r.idx = 0 |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Reset clears the state of the Reader r such that it is equivalent to its
|
||||
// initial state from NewReader, but instead writing to writer.
|
||||
// No access to reader is performed.
|
||||
//
|
||||
// w.Close must be called before Reset.
|
||||
func (r *Reader) Reset(reader io.Reader) { |
||||
r.frame.Reset(1) |
||||
r.src = reader |
||||
r.data = nil |
||||
r.idx = 0 |
||||
r.state.reset() |
||||
} |
||||
|
||||
// WriteTo efficiently uncompresses the data from the Reader underlying source to w.
|
||||
func (r *Reader) WriteTo(w io.Writer) (n int64, err error) { |
||||
switch r.state.state { |
||||
case closedState, errorState: |
||||
return 0, r.state.err |
||||
case newState: |
||||
if err = r.init(); r.state.next(err) { |
||||
return |
||||
} |
||||
default: |
||||
return 0, r.state.fail() |
||||
} |
||||
defer r.state.nextd(&err) |
||||
|
||||
var bn int |
||||
block := r.frame.Blocks.Block |
||||
size := r.frame.Descriptor.Flags.BlockSizeIndex() |
||||
data := size.Get() |
||||
defer size.Put(data) |
||||
for { |
||||
switch bn, err = block.Uncompress(r.frame, r.src, data); err { |
||||
case nil: |
||||
case io.EOF: |
||||
err = r.frame.CloseR(r.src) |
||||
return |
||||
default: |
||||
return |
||||
} |
||||
r.handler(bn) |
||||
bn, err = w.Write(data[:bn]) |
||||
n += int64(bn) |
||||
if err != nil { |
||||
return |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,75 @@ |
||||
package lz4 |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
|
||||
"github.com/pierrec/lz4/v4/internal/lz4errors" |
||||
) |
||||
|
||||
//go:generate go run golang.org/x/tools/cmd/stringer -type=aState -output state_gen.go
|
||||
|
||||
const ( |
||||
noState aState = iota // uninitialized reader
|
||||
errorState // unrecoverable error encountered
|
||||
newState // instantiated object
|
||||
readState // reading data
|
||||
writeState // writing data
|
||||
closedState // all done
|
||||
) |
||||
|
||||
type ( |
||||
aState uint8 |
||||
_State struct { |
||||
states []aState |
||||
state aState |
||||
err error |
||||
} |
||||
) |
||||
|
||||
func (s *_State) init(states []aState) { |
||||
s.states = states |
||||
s.state = states[0] |
||||
} |
||||
|
||||
func (s *_State) reset() { |
||||
s.state = s.states[0] |
||||
s.err = nil |
||||
} |
||||
|
||||
// next sets the state to the next one unless it is passed a non nil error.
|
||||
// It returns whether or not it is in error.
|
||||
func (s *_State) next(err error) bool { |
||||
if err != nil { |
||||
s.err = fmt.Errorf("%s: %w", s.state, err) |
||||
s.state = errorState |
||||
return true |
||||
} |
||||
s.state = s.states[s.state] |
||||
return false |
||||
} |
||||
|
||||
// nextd is like next but for defers.
|
||||
func (s *_State) nextd(errp *error) bool { |
||||
return errp != nil && s.next(*errp) |
||||
} |
||||
|
||||
// check sets s in error if not already in error and if the error is not nil or io.EOF,
|
||||
func (s *_State) check(errp *error) { |
||||
if s.state == errorState || errp == nil { |
||||
return |
||||
} |
||||
if err := *errp; err != nil { |
||||
s.err = fmt.Errorf("%w[%s]", err, s.state) |
||||
if !errors.Is(err, io.EOF) { |
||||
s.state = errorState |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (s *_State) fail() error { |
||||
s.state = errorState |
||||
s.err = fmt.Errorf("%w[%s]", lz4errors.ErrInternalUnhandledState, s.state) |
||||
return s.err |
||||
} |
@ -0,0 +1,28 @@ |
||||
// Code generated by "stringer -type=aState -output state_gen.go"; DO NOT EDIT.
|
||||
|
||||
package lz4 |
||||
|
||||
import "strconv" |
||||
|
||||
func _() { |
||||
// An "invalid array index" compiler error signifies that the constant values have changed.
|
||||
// Re-run the stringer command to generate them again.
|
||||
var x [1]struct{} |
||||
_ = x[noState-0] |
||||
_ = x[errorState-1] |
||||
_ = x[newState-2] |
||||
_ = x[readState-3] |
||||
_ = x[writeState-4] |
||||
_ = x[closedState-5] |
||||
} |
||||
|
||||
const _aState_name = "noStateerrorStatenewStatereadStatewriteStateclosedState" |
||||
|
||||
var _aState_index = [...]uint8{0, 7, 17, 25, 34, 44, 55} |
||||
|
||||
func (i aState) String() string { |
||||
if i >= aState(len(_aState_index)-1) { |
||||
return "aState(" + strconv.FormatInt(int64(i), 10) + ")" |
||||
} |
||||
return _aState_name[_aState_index[i]:_aState_index[i+1]] |
||||
} |
@ -0,0 +1,232 @@ |
||||
package lz4 |
||||
|
||||
import ( |
||||
"io" |
||||
|
||||
"github.com/pierrec/lz4/v4/internal/lz4block" |
||||
"github.com/pierrec/lz4/v4/internal/lz4errors" |
||||
"github.com/pierrec/lz4/v4/internal/lz4stream" |
||||
) |
||||
|
||||
var writerStates = []aState{ |
||||
noState: newState, |
||||
newState: writeState, |
||||
writeState: closedState, |
||||
closedState: newState, |
||||
errorState: newState, |
||||
} |
||||
|
||||
// NewWriter returns a new LZ4 frame encoder.
|
||||
func NewWriter(w io.Writer) *Writer { |
||||
zw := &Writer{frame: lz4stream.NewFrame()} |
||||
zw.state.init(writerStates) |
||||
_ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone) |
||||
zw.Reset(w) |
||||
return zw |
||||
} |
||||
|
||||
// Writer allows writing an LZ4 stream.
|
||||
type Writer struct { |
||||
state _State |
||||
src io.Writer // destination writer
|
||||
level lz4block.CompressionLevel // how hard to try
|
||||
num int // concurrency level
|
||||
frame *lz4stream.Frame // frame being built
|
||||
data []byte // pending data
|
||||
idx int // size of pending data
|
||||
handler func(int) |
||||
} |
||||
|
||||
func (*Writer) private() {} |
||||
|
||||
func (w *Writer) Apply(options ...Option) (err error) { |
||||
defer w.state.check(&err) |
||||
switch w.state.state { |
||||
case newState: |
||||
case errorState: |
||||
return w.state.err |
||||
default: |
||||
return lz4errors.ErrOptionClosedOrError |
||||
} |
||||
for _, o := range options { |
||||
if err = o(w); err != nil { |
||||
return |
||||
} |
||||
} |
||||
w.Reset(w.src) |
||||
return |
||||
} |
||||
|
||||
func (w *Writer) isNotConcurrent() bool { |
||||
return w.num == 1 |
||||
} |
||||
|
||||
// init sets up the Writer when in newState. It does not change the Writer state.
|
||||
func (w *Writer) init() error { |
||||
w.frame.InitW(w.src, w.num) |
||||
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
||||
w.data = size.Get() |
||||
w.idx = 0 |
||||
return w.frame.Descriptor.Write(w.frame, w.src) |
||||
} |
||||
|
||||
func (w *Writer) Write(buf []byte) (n int, err error) { |
||||
defer w.state.check(&err) |
||||
switch w.state.state { |
||||
case writeState: |
||||
case closedState, errorState: |
||||
return 0, w.state.err |
||||
case newState: |
||||
if err = w.init(); w.state.next(err) { |
||||
return |
||||
} |
||||
default: |
||||
return 0, w.state.fail() |
||||
} |
||||
|
||||
zn := len(w.data) |
||||
for len(buf) > 0 { |
||||
if w.idx == 0 && len(buf) >= zn { |
||||
// Avoid a copy as there is enough data for a block.
|
||||
if err = w.write(buf[:zn], false); err != nil { |
||||
return |
||||
} |
||||
n += zn |
||||
buf = buf[zn:] |
||||
continue |
||||
} |
||||
// Accumulate the data to be compressed.
|
||||
m := copy(w.data[w.idx:], buf) |
||||
n += m |
||||
w.idx += m |
||||
buf = buf[m:] |
||||
|
||||
if w.idx < len(w.data) { |
||||
// Buffer not filled.
|
||||
return |
||||
} |
||||
|
||||
// Buffer full.
|
||||
if err = w.write(w.data, true); err != nil { |
||||
return |
||||
} |
||||
if !w.isNotConcurrent() { |
||||
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
||||
w.data = size.Get() |
||||
} |
||||
w.idx = 0 |
||||
} |
||||
return |
||||
} |
||||
|
||||
func (w *Writer) write(data []byte, safe bool) error { |
||||
if w.isNotConcurrent() { |
||||
block := w.frame.Blocks.Block |
||||
err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src) |
||||
w.handler(len(block.Data)) |
||||
return err |
||||
} |
||||
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
||||
c := make(chan *lz4stream.FrameDataBlock) |
||||
w.frame.Blocks.Blocks <- c |
||||
go func(c chan *lz4stream.FrameDataBlock, data []byte, size lz4block.BlockSizeIndex, safe bool) { |
||||
b := lz4stream.NewFrameDataBlock(size) |
||||
c <- b.Compress(w.frame, data, w.level) |
||||
<-c |
||||
w.handler(len(b.Data)) |
||||
b.CloseW(w.frame) |
||||
if safe { |
||||
// safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed
|
||||
size.Put(data) |
||||
} |
||||
}(c, data, size, safe) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
|
||||
// but does not close the underlying io.Writer.
|
||||
func (w *Writer) Close() (err error) { |
||||
switch w.state.state { |
||||
case writeState: |
||||
case errorState: |
||||
return w.state.err |
||||
default: |
||||
return nil |
||||
} |
||||
defer w.state.nextd(&err) |
||||
if w.idx > 0 { |
||||
// Flush pending data, disable w.data freeing as it is done later on.
|
||||
if err = w.write(w.data[:w.idx], false); err != nil { |
||||
return err |
||||
} |
||||
w.idx = 0 |
||||
} |
||||
err = w.frame.CloseW(w.src, w.num) |
||||
// It is now safe to free the buffer.
|
||||
if w.data != nil { |
||||
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
||||
size.Put(w.data) |
||||
w.data = nil |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Reset clears the state of the Writer w such that it is equivalent to its
|
||||
// initial state from NewWriter, but instead writing to writer.
|
||||
// Reset keeps the previous options unless overwritten by the supplied ones.
|
||||
// No access to writer is performed.
|
||||
//
|
||||
// w.Close must be called before Reset or pending data may be dropped.
|
||||
func (w *Writer) Reset(writer io.Writer) { |
||||
w.frame.Reset(w.num) |
||||
w.state.reset() |
||||
w.src = writer |
||||
} |
||||
|
||||
// ReadFrom efficiently reads from r and compressed into the Writer destination.
|
||||
func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { |
||||
switch w.state.state { |
||||
case closedState, errorState: |
||||
return 0, w.state.err |
||||
case newState: |
||||
if err = w.init(); w.state.next(err) { |
||||
return |
||||
} |
||||
default: |
||||
return 0, w.state.fail() |
||||
} |
||||
defer w.state.check(&err) |
||||
|
||||
size := w.frame.Descriptor.Flags.BlockSizeIndex() |
||||
var done bool |
||||
var rn int |
||||
data := size.Get() |
||||
if w.isNotConcurrent() { |
||||
// Keep the same buffer for the whole process.
|
||||
defer size.Put(data) |
||||
} |
||||
for !done { |
||||
rn, err = io.ReadFull(r, data) |
||||
switch err { |
||||
case nil: |
||||
case io.EOF: |
||||
done = true |
||||
default: |
||||
return |
||||
} |
||||
n += int64(rn) |
||||
err = w.write(data[:rn], true) |
||||
if err != nil { |
||||
return |
||||
} |
||||
w.handler(rn) |
||||
if !done && !w.isNotConcurrent() { |
||||
// The buffer will be returned automatically by go routines (safe=true)
|
||||
// so get a new one fo the next round.
|
||||
data = size.Get() |
||||
} |
||||
} |
||||
err = w.Close() |
||||
return |
||||
} |
@ -1,413 +0,0 @@ |
||||
package lz4 |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
"runtime" |
||||
|
||||
"github.com/pierrec/lz4/internal/xxh32" |
||||
) |
||||
|
||||
// zResult contains the results of compressing a block.
|
||||
type zResult struct { |
||||
size uint32 // Block header
|
||||
data []byte // Compressed data
|
||||
checksum uint32 // Data checksum
|
||||
} |
||||
|
||||
// Writer implements the LZ4 frame encoder.
|
||||
type Writer struct { |
||||
Header |
||||
// Handler called when a block has been successfully written out.
|
||||
// It provides the number of bytes written.
|
||||
OnBlockDone func(size int) |
||||
|
||||
buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
|
||||
dst io.Writer // Destination.
|
||||
checksum xxh32.XXHZero // Frame checksum.
|
||||
data []byte // Data to be compressed + buffer for compressed data.
|
||||
idx int // Index into data.
|
||||
hashtable [winSize]int // Hash table used in CompressBlock().
|
||||
|
||||
// For concurrency.
|
||||
c chan chan zResult // Channel for block compression goroutines and writer goroutine.
|
||||
err error // Any error encountered while writing to the underlying destination.
|
||||
} |
||||
|
||||
// NewWriter returns a new LZ4 frame encoder.
|
||||
// No access to the underlying io.Writer is performed.
|
||||
// The supplied Header is checked at the first Write.
|
||||
// It is ok to change it before the first Write but then not until a Reset() is performed.
|
||||
func NewWriter(dst io.Writer) *Writer { |
||||
z := new(Writer) |
||||
z.Reset(dst) |
||||
return z |
||||
} |
||||
|
||||
// WithConcurrency sets the number of concurrent go routines used for compression.
|
||||
// A negative value sets the concurrency to GOMAXPROCS.
|
||||
func (z *Writer) WithConcurrency(n int) *Writer { |
||||
switch { |
||||
case n == 0 || n == 1: |
||||
z.c = nil |
||||
return z |
||||
case n < 0: |
||||
n = runtime.GOMAXPROCS(0) |
||||
} |
||||
z.c = make(chan chan zResult, n) |
||||
// Writer goroutine managing concurrent block compression goroutines.
|
||||
go func() { |
||||
// Process next block compression item.
|
||||
for c := range z.c { |
||||
// Read the next compressed block result.
|
||||
// Waiting here ensures that the blocks are output in the order they were sent.
|
||||
// The incoming channel is always closed as it indicates to the caller that
|
||||
// the block has been processed.
|
||||
res := <-c |
||||
n := len(res.data) |
||||
if n == 0 { |
||||
// Notify the block compression routine that we are done with its result.
|
||||
// This is used when a sentinel block is sent to terminate the compression.
|
||||
close(c) |
||||
return |
||||
} |
||||
// Write the block.
|
||||
if err := z.writeUint32(res.size); err != nil && z.err == nil { |
||||
z.err = err |
||||
} |
||||
if _, err := z.dst.Write(res.data); err != nil && z.err == nil { |
||||
z.err = err |
||||
} |
||||
if z.BlockChecksum { |
||||
if err := z.writeUint32(res.checksum); err != nil && z.err == nil { |
||||
z.err = err |
||||
} |
||||
} |
||||
if isCompressed := res.size&compressedBlockFlag == 0; isCompressed { |
||||
// It is now safe to release the buffer as no longer in use by any goroutine.
|
||||
putBuffer(cap(res.data), res.data) |
||||
} |
||||
if h := z.OnBlockDone; h != nil { |
||||
h(n) |
||||
} |
||||
close(c) |
||||
} |
||||
}() |
||||
return z |
||||
} |
||||
|
||||
// newBuffers instantiates new buffers which size matches the one in Header.
|
||||
// The returned buffers are for decompression and compression respectively.
|
||||
func (z *Writer) newBuffers() { |
||||
bSize := z.Header.BlockMaxSize |
||||
buf := getBuffer(bSize) |
||||
z.data = buf[:bSize] // Uncompressed buffer is the first half.
|
||||
} |
||||
|
||||
// freeBuffers puts the writer's buffers back to the pool.
|
||||
func (z *Writer) freeBuffers() { |
||||
// Put the buffer back into the pool, if any.
|
||||
putBuffer(z.Header.BlockMaxSize, z.data) |
||||
z.data = nil |
||||
} |
||||
|
||||
// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
|
||||
func (z *Writer) writeHeader() error { |
||||
// Default to 4Mb if BlockMaxSize is not set.
|
||||
if z.Header.BlockMaxSize == 0 { |
||||
z.Header.BlockMaxSize = blockSize4M |
||||
} |
||||
// The only option that needs to be validated.
|
||||
bSize := z.Header.BlockMaxSize |
||||
if !isValidBlockSize(z.Header.BlockMaxSize) { |
||||
return fmt.Errorf("lz4: invalid block max size: %d", bSize) |
||||
} |
||||
// Allocate the compressed/uncompressed buffers.
|
||||
// The compressed buffer cannot exceed the uncompressed one.
|
||||
z.newBuffers() |
||||
z.idx = 0 |
||||
|
||||
// Size is optional.
|
||||
buf := z.buf[:] |
||||
|
||||
// Set the fixed size data: magic number, block max size and flags.
|
||||
binary.LittleEndian.PutUint32(buf[0:], frameMagic) |
||||
flg := byte(Version << 6) |
||||
flg |= 1 << 5 // No block dependency.
|
||||
if z.Header.BlockChecksum { |
||||
flg |= 1 << 4 |
||||
} |
||||
if z.Header.Size > 0 { |
||||
flg |= 1 << 3 |
||||
} |
||||
if !z.Header.NoChecksum { |
||||
flg |= 1 << 2 |
||||
} |
||||
buf[4] = flg |
||||
buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4 |
||||
|
||||
// Current buffer size: magic(4) + flags(1) + block max size (1).
|
||||
n := 6 |
||||
// Optional items.
|
||||
if z.Header.Size > 0 { |
||||
binary.LittleEndian.PutUint64(buf[n:], z.Header.Size) |
||||
n += 8 |
||||
} |
||||
|
||||
// The header checksum includes the flags, block max size and optional Size.
|
||||
buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF) |
||||
z.checksum.Reset() |
||||
|
||||
// Header ready, write it out.
|
||||
if _, err := z.dst.Write(buf[0 : n+1]); err != nil { |
||||
return err |
||||
} |
||||
z.Header.done = true |
||||
if debugFlag { |
||||
debug("wrote header %v", z.Header) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Write compresses data from the supplied buffer into the underlying io.Writer.
|
||||
// Write does not return until the data has been written.
|
||||
func (z *Writer) Write(buf []byte) (int, error) { |
||||
if !z.Header.done { |
||||
if err := z.writeHeader(); err != nil { |
||||
return 0, err |
||||
} |
||||
} |
||||
if debugFlag { |
||||
debug("input buffer len=%d index=%d", len(buf), z.idx) |
||||
} |
||||
|
||||
zn := len(z.data) |
||||
var n int |
||||
for len(buf) > 0 { |
||||
if z.idx == 0 && len(buf) >= zn { |
||||
// Avoid a copy as there is enough data for a block.
|
||||
if err := z.compressBlock(buf[:zn]); err != nil { |
||||
return n, err |
||||
} |
||||
n += zn |
||||
buf = buf[zn:] |
||||
continue |
||||
} |
||||
// Accumulate the data to be compressed.
|
||||
m := copy(z.data[z.idx:], buf) |
||||
n += m |
||||
z.idx += m |
||||
buf = buf[m:] |
||||
if debugFlag { |
||||
debug("%d bytes copied to buf, current index %d", n, z.idx) |
||||
} |
||||
|
||||
if z.idx < len(z.data) { |
||||
// Buffer not filled.
|
||||
if debugFlag { |
||||
debug("need more data for compression") |
||||
} |
||||
return n, nil |
||||
} |
||||
|
||||
// Buffer full.
|
||||
if err := z.compressBlock(z.data); err != nil { |
||||
return n, err |
||||
} |
||||
z.idx = 0 |
||||
} |
||||
|
||||
return n, nil |
||||
} |
||||
|
||||
// compressBlock compresses a block.
|
||||
func (z *Writer) compressBlock(data []byte) error { |
||||
if !z.NoChecksum { |
||||
_, _ = z.checksum.Write(data) |
||||
} |
||||
|
||||
if z.c != nil { |
||||
c := make(chan zResult) |
||||
z.c <- c // Send now to guarantee order
|
||||
go writerCompressBlock(c, z.Header, data) |
||||
return nil |
||||
} |
||||
|
||||
zdata := z.data[z.Header.BlockMaxSize:cap(z.data)] |
||||
// The compressed block size cannot exceed the input's.
|
||||
var zn int |
||||
|
||||
if level := z.Header.CompressionLevel; level != 0 { |
||||
zn, _ = CompressBlockHC(data, zdata, level) |
||||
} else { |
||||
zn, _ = CompressBlock(data, zdata, z.hashtable[:]) |
||||
} |
||||
|
||||
var bLen uint32 |
||||
if debugFlag { |
||||
debug("block compression %d => %d", len(data), zn) |
||||
} |
||||
if zn > 0 && zn < len(data) { |
||||
// Compressible and compressed size smaller than uncompressed: ok!
|
||||
bLen = uint32(zn) |
||||
zdata = zdata[:zn] |
||||
} else { |
||||
// Uncompressed block.
|
||||
bLen = uint32(len(data)) | compressedBlockFlag |
||||
zdata = data |
||||
} |
||||
if debugFlag { |
||||
debug("block compression to be written len=%d data len=%d", bLen, len(zdata)) |
||||
} |
||||
|
||||
// Write the block.
|
||||
if err := z.writeUint32(bLen); err != nil { |
||||
return err |
||||
} |
||||
written, err := z.dst.Write(zdata) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if h := z.OnBlockDone; h != nil { |
||||
h(written) |
||||
} |
||||
|
||||
if !z.BlockChecksum { |
||||
if debugFlag { |
||||
debug("current frame checksum %x", z.checksum.Sum32()) |
||||
} |
||||
return nil |
||||
} |
||||
checksum := xxh32.ChecksumZero(zdata) |
||||
if debugFlag { |
||||
debug("block checksum %x", checksum) |
||||
defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }() |
||||
} |
||||
return z.writeUint32(checksum) |
||||
} |
||||
|
||||
// Flush flushes any pending compressed data to the underlying writer.
|
||||
// Flush does not return until the data has been written.
|
||||
// If the underlying writer returns an error, Flush returns that error.
|
||||
func (z *Writer) Flush() error { |
||||
if debugFlag { |
||||
debug("flush with index %d", z.idx) |
||||
} |
||||
if z.idx == 0 { |
||||
return nil |
||||
} |
||||
|
||||
data := z.data[:z.idx] |
||||
z.idx = 0 |
||||
if z.c == nil { |
||||
return z.compressBlock(data) |
||||
} |
||||
if !z.NoChecksum { |
||||
_, _ = z.checksum.Write(data) |
||||
} |
||||
c := make(chan zResult) |
||||
z.c <- c |
||||
writerCompressBlock(c, z.Header, data) |
||||
return nil |
||||
} |
||||
|
||||
func (z *Writer) close() error { |
||||
if z.c == nil { |
||||
return nil |
||||
} |
||||
// Send a sentinel block (no data to compress) to terminate the writer main goroutine.
|
||||
c := make(chan zResult) |
||||
z.c <- c |
||||
c <- zResult{} |
||||
// Wait for the main goroutine to complete.
|
||||
<-c |
||||
// At this point the main goroutine has shut down or is about to return.
|
||||
z.c = nil |
||||
return z.err |
||||
} |
||||
|
||||
// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
|
||||
func (z *Writer) Close() error { |
||||
if !z.Header.done { |
||||
if err := z.writeHeader(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
if err := z.Flush(); err != nil { |
||||
return err |
||||
} |
||||
if err := z.close(); err != nil { |
||||
return err |
||||
} |
||||
z.freeBuffers() |
||||
|
||||
if debugFlag { |
||||
debug("writing last empty block") |
||||
} |
||||
if err := z.writeUint32(0); err != nil { |
||||
return err |
||||
} |
||||
if z.NoChecksum { |
||||
return nil |
||||
} |
||||
checksum := z.checksum.Sum32() |
||||
if debugFlag { |
||||
debug("stream checksum %x", checksum) |
||||
} |
||||
return z.writeUint32(checksum) |
||||
} |
||||
|
||||
// Reset clears the state of the Writer z such that it is equivalent to its
|
||||
// initial state from NewWriter, but instead writing to w.
|
||||
// No access to the underlying io.Writer is performed.
|
||||
func (z *Writer) Reset(w io.Writer) { |
||||
n := cap(z.c) |
||||
_ = z.close() |
||||
z.freeBuffers() |
||||
z.Header.Reset() |
||||
z.dst = w |
||||
z.checksum.Reset() |
||||
z.idx = 0 |
||||
z.err = nil |
||||
// reset hashtable to ensure deterministic output.
|
||||
for i := range z.hashtable { |
||||
z.hashtable[i] = 0 |
||||
} |
||||
z.WithConcurrency(n) |
||||
} |
||||
|
||||
// writeUint32 writes a uint32 to the underlying writer.
|
||||
func (z *Writer) writeUint32(x uint32) error { |
||||
buf := z.buf[:4] |
||||
binary.LittleEndian.PutUint32(buf, x) |
||||
_, err := z.dst.Write(buf) |
||||
return err |
||||
} |
||||
|
||||
// writerCompressBlock compresses data into a pooled buffer and writes its result
|
||||
// out to the input channel.
|
||||
func writerCompressBlock(c chan zResult, header Header, data []byte) { |
||||
zdata := getBuffer(header.BlockMaxSize) |
||||
// The compressed block size cannot exceed the input's.
|
||||
var zn int |
||||
if level := header.CompressionLevel; level != 0 { |
||||
zn, _ = CompressBlockHC(data, zdata, level) |
||||
} else { |
||||
var hashTable [winSize]int |
||||
zn, _ = CompressBlock(data, zdata, hashTable[:]) |
||||
} |
||||
var res zResult |
||||
if zn > 0 && zn < len(data) { |
||||
res.size = uint32(zn) |
||||
res.data = zdata[:zn] |
||||
} else { |
||||
res.size = uint32(len(data)) | compressedBlockFlag |
||||
res.data = data |
||||
} |
||||
if header.BlockChecksum { |
||||
res.checksum = xxh32.ChecksumZero(res.data) |
||||
} |
||||
c <- res |
||||
} |
Loading…
Reference in new issue