Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/internal/dataset/value_encoding_plain.go

140 lines
3.6 KiB

package dataset
import (
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
)
func init() {
// Register the encoding so instances of it can be dynamically created.
registerValueEncoding(
datasetmd.PHYSICAL_TYPE_BINARY,
datasetmd.ENCODING_TYPE_PLAIN,
func(w streamio.Writer) valueEncoder { return newPlainBytesEncoder(w) },
func(r streamio.Reader) valueDecoder { return newPlainBytesDecoder(r) },
)
}
// A plainBytesEncoder encodes byte array values to an [streamio.Writer].
type plainBytesEncoder struct {
w streamio.Writer
}
var _ valueEncoder = (*plainBytesEncoder)(nil)
// newPlainEncoder creates a plainEncoder that writes encoded strings to w.
func newPlainBytesEncoder(w streamio.Writer) *plainBytesEncoder {
return &plainBytesEncoder{w: w}
}
// PhysicalType returns [datasetmd.PHYSICAL_TYPE_BINARY].
func (enc *plainBytesEncoder) PhysicalType() datasetmd.PhysicalType {
return datasetmd.PHYSICAL_TYPE_BINARY
}
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
func (enc *plainBytesEncoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_PLAIN
}
// Encode encodes an individual string value.
func (enc *plainBytesEncoder) Encode(v Value) error {
if v.Type() != datasetmd.PHYSICAL_TYPE_BINARY {
return fmt.Errorf("plain: invalid value type %v", v.Type())
}
sv := v.Binary()
if err := streamio.WriteUvarint(enc.w, uint64(len(sv))); err != nil {
return err
}
n, err := enc.w.Write(sv)
if n != len(sv) {
return fmt.Errorf("short write; expected %d bytes, wrote %d", len(sv), n)
}
return err
}
// Flush implements [valueEncoder]. It is a no-op for plainEncoder.
func (enc *plainBytesEncoder) Flush() error {
return nil
}
// Reset implements [valueEncoder]. It resets the encoder to write to w.
func (enc *plainBytesEncoder) Reset(w streamio.Writer) {
enc.w = w
}
// plainBytesDecoder decodes byte arrays from an [streamio.Reader].
type plainBytesDecoder struct {
r streamio.Reader
}
var _ valueDecoder = (*plainBytesDecoder)(nil)
// newPlainBytesDecoder creates a plainDecoder that reads encoded strings from r.
func newPlainBytesDecoder(r streamio.Reader) *plainBytesDecoder {
return &plainBytesDecoder{r: r}
}
// PhysicalType returns [datasetmd.PHYSICAL_TYPE_BINARY].
func (dec *plainBytesDecoder) PhysicalType() datasetmd.PhysicalType {
return datasetmd.PHYSICAL_TYPE_BINARY
}
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
func (dec *plainBytesDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_PLAIN
}
// Decode decodes up to len(s) values, storing the results into s. The
// number of decoded values is returned, followed by an error (if any).
// At the end of the stream, Decode returns 0, [io.EOF].
func (dec *plainBytesDecoder) Decode(s []Value) (int, error) {
if len(s) == 0 {
return 0, nil
}
var err error
for i := range s {
err = dec.decode(&s[i])
if errors.Is(err, io.EOF) {
if i == 0 {
return 0, io.EOF
}
return i, nil
} else if err != nil {
return i, err
}
}
return len(s), nil
}
// decode decodes a string.
func (dec *plainBytesDecoder) decode(v *Value) error {
sz, err := binary.ReadUvarint(dec.r)
if err != nil {
return err
}
dst := slicegrow.GrowToCap(v.Buffer(), int(sz))
dst = dst[:sz]
if _, err := io.ReadFull(dec.r, dst); err != nil {
return err
}
*v = BinaryValue(dst)
return nil
}
// Reset implements [valueDecoder]. It resets the decoder to read from r.
func (dec *plainBytesDecoder) Reset(r streamio.Reader) {
dec.r = r
}