Lazily decode series protobuf. (#10071)

**What this PR does / why we need it**:
The protobuf decoding for series responses runs into a memory issue for
many series. The response is only merged and passed through the front
end. It is more efficient to decode the protobuf encoded series lazily.
And `decode`, `merge`, `encode` benchmark shows the benefit of
transcoding the protobuf series message into a JSON response directly.

This change will not impact production code since it only applies to
protobuf encoding messaging between the querier and query frontend that
must be explicitly enabled.

```
› go test -v -run=^$ -bench "Benchmark_DecodeMergeEncodeCycle" -memprofile memory_base.prof -count=10 ./pkg/querier/queryrange > before.txt
› go test -v -run=^$ -bench "Benchmark_DecodeMergeEncodeCycle" -memprofile memory_base.prof -count=10 ./pkg/querier/queryrange > before.txt
› benchstat before.txt after.txt
before.txt:5: missing iteration count
after.txt:5: missing iteration count
goos: linux
goarch: amd64
pkg: github.com/grafana/loki/pkg/querier/queryrange
cpu: AMD Ryzen 7 3700X 8-Core Processor             
                           │  before.txt  │              after.txt              │
                           │    sec/op    │   sec/op     vs base                │
_DecodeMergeEncodeCycle-16   2537.7m ± 2%   934.2m ± 1%  -63.19% (p=0.000 n=10)

                           │  before.txt   │              after.txt               │
                           │     B/op      │     B/op      vs base                │
_DecodeMergeEncodeCycle-16   1723.4Mi ± 0%   641.1Mi ± 0%  -62.80% (p=0.000 n=10)

                           │  before.txt   │              after.txt              │
                           │   allocs/op   │  allocs/op   vs base                │
_DecodeMergeEncodeCycle-16   20240.6k ± 0%   203.0k ± 0%  -99.00% (p=0.000 n=10)

```

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
pull/10131/head
Karsten Jeschkies 3 years ago committed by GitHub
parent ad70b5d54b
commit fbcaa1d5d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      go.mod
  2. 3
      go.sum
  3. 1
      pkg/logproto/extensions.go
  4. 28
      pkg/querier/queryrange/codec.go
  5. 51
      pkg/querier/queryrange/codec_test.go
  6. 355
      pkg/querier/queryrange/views.go
  7. 326
      pkg/querier/queryrange/views_test.go
  8. 21
      vendor/github.com/richardartoul/molecule/LICENSE
  9. 8
      vendor/github.com/richardartoul/molecule/Makefile
  10. 44
      vendor/github.com/richardartoul/molecule/README.md
  11. 45
      vendor/github.com/richardartoul/molecule/doc.go
  12. 166
      vendor/github.com/richardartoul/molecule/molecule.go
  13. 202
      vendor/github.com/richardartoul/molecule/src/codec/LICENSE
  14. 5
      vendor/github.com/richardartoul/molecule/src/codec/README.md
  15. 84
      vendor/github.com/richardartoul/molecule/src/codec/buffer.go
  16. 42
      vendor/github.com/richardartoul/molecule/src/codec/constants.go
  17. 482
      vendor/github.com/richardartoul/molecule/src/codec/decode.go
  18. 27
      vendor/github.com/richardartoul/molecule/src/protowire/LICENSE
  19. 137
      vendor/github.com/richardartoul/molecule/src/protowire/wire.go
  20. 506
      vendor/github.com/richardartoul/molecule/stream.go
  21. 154
      vendor/github.com/richardartoul/molecule/value.go
  22. 5
      vendor/modules.txt

@ -123,6 +123,7 @@ require (
github.com/heroku/x v0.0.59
github.com/prometheus/alertmanager v0.25.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0
github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204
github.com/willf/bloom v2.0.3+incompatible
go4.org/netipx v0.0.0-20230125063823-8449b0a6169f

@ -1001,6 +1001,7 @@ github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
@ -1581,6 +1582,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=

@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
// This is the separator define in the Prometheus Labels.Hash function.
var seps = []byte{'\xff'}
// Hash returns hash of the labels according to Prometheus' Labels.Hash function.

@ -471,7 +471,7 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
}
return req.WithContext(ctx), nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
return nil, httpgrpc.Errorf(http.StatusInternalServerError, fmt.Sprintf("invalid request format, got (%T)", r))
}
}
@ -654,6 +654,11 @@ func decodeResponseProtobuf(r *http.Response, req queryrangebase.Request) (query
}
}
// Shortcut series responses without deserialization.
if _, ok := req.(*LokiSeriesRequest); ok {
return GetLokiSeriesResponseView(buf)
}
resp := &QueryResponse{}
err = resp.Unmarshal(buf)
if err != nil {
@ -722,7 +727,10 @@ func encodeResponseJSON(ctx context.Context, version loghttp.Version, res queryr
return nil, err
}
}
case *MergedSeriesResponseView:
if err := WriteSeriesResponseViewJSON(response, &buf); err != nil {
return nil, err
}
case *LokiSeriesResponse:
result := logproto.SeriesResponse{
Series: response.Data,
@ -749,7 +757,7 @@ func encodeResponseJSON(ctx context.Context, version loghttp.Version, res queryr
return nil, err
}
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format")
return nil, httpgrpc.Errorf(http.StatusInternalServerError, fmt.Sprintf("invalid response formatt, got (%T)", res))
}
sp.LogFields(otlog.Int("bytes", buf.Len()))
@ -777,6 +785,12 @@ func encodeResponseProtobuf(ctx context.Context, res queryrangebase.Response) (*
p.Response = &QueryResponse_Streams{response}
case *LokiSeriesResponse:
p.Response = &QueryResponse_Series{response}
case *MergedSeriesResponseView:
mat, err := response.Materialize()
if err != nil {
return nil, err
}
p.Response = &QueryResponse_Series{mat}
case *LokiLabelNamesResponse:
p.Response = &QueryResponse_Labels{response}
case *IndexStatsResponse:
@ -784,7 +798,7 @@ func encodeResponseProtobuf(ctx context.Context, res queryrangebase.Response) (*
case *TopKSketchesResponse:
p.Response = &QueryResponse_TopkSketches{response}
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format")
return nil, httpgrpc.Errorf(http.StatusInternalServerError, fmt.Sprintf("invalid response format, got (%T)", res))
}
buf, err := p.Marshal()
@ -872,6 +886,12 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
Data: lokiSeriesData,
Statistics: mergedStats,
}, nil
case *LokiSeriesResponseView:
v := &MergedSeriesResponseView{}
for _, r := range responses {
v.responses = append(v.responses, r.(*LokiSeriesResponseView))
}
return v, nil
case *LokiLabelNamesResponse:
labelNameRes := responses[0].(*LokiLabelNamesResponse)
uniqueNames := make(map[string]struct{})

@ -1784,11 +1784,60 @@ func Benchmark_CodecDecodeSamples(b *testing.B) {
Direction: logproto.BACKWARD,
Path: u.String(),
})
require.Nil(b, err)
require.NoError(b, err)
require.NotNil(b, result)
}
}
func Benchmark_CodecDecodeSeries(b *testing.B) {
ctx := context.Background()
benchmarks := []struct {
accept string
}{
{accept: ProtobufType},
{accept: JSONType},
}
for _, bm := range benchmarks {
u := &url.URL{Path: "/loki/api/v1/series"}
req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Header: http.Header{
"Accept": []string{bm.accept},
},
}
resp, err := DefaultCodec.EncodeResponse(ctx, req, &LokiSeriesResponse{
Status: "200",
Version: 1,
Statistics: stats.Result{},
Data: generateSeries(),
})
require.Nil(b, err)
buf, err := io.ReadAll(resp.Body)
require.Nil(b, err)
reader := bytes.NewReader(buf)
resp.Body = io.NopCloser(reader)
b.Run(bm.accept, func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
_, _ = reader.Seek(0, io.SeekStart)
result, err := DefaultCodec.DecodeResponse(ctx, resp, &LokiSeriesRequest{
StartTs: start,
EndTs: end,
Path: u.String(),
})
require.NoError(b, err)
require.NotNil(b, result)
}
})
}
}
func Benchmark_MergeResponses(b *testing.B) {
var responses []queryrangebase.Response = make([]queryrangebase.Response, 100)
for i := range responses {

@ -0,0 +1,355 @@
package queryrange
import (
"fmt"
"io"
"sort"
"github.com/cespare/xxhash/v2"
"github.com/gogo/protobuf/protoc-gen-gogo/descriptor"
jsoniter "github.com/json-iterator/go"
"github.com/richardartoul/molecule"
"github.com/richardartoul/molecule/src/codec"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)
// Pull fiel numbers from protobuf message descriptions.
var (
queryResponse *QueryResponse
_, queryResponseDescription = descriptor.ForMessage(queryResponse)
seriesResponseFieldNumber = queryResponseDescription.GetFieldDescriptor("series").GetNumber()
seriesResponse *LokiSeriesResponse
_, seriesResponseDescription = descriptor.ForMessage(seriesResponse)
dataFieldNumber = seriesResponseDescription.GetFieldDescriptor("Data").GetNumber()
seriesIdentifier *logproto.SeriesIdentifier
_, seriesIdentifierDescription = descriptor.ForMessage(seriesIdentifier)
labelsFieldNumber = seriesIdentifierDescription.GetFieldDescriptor("labels").GetNumber()
)
// GetLokiSeriesResponseView returns a view on the series response of a
// QueryResponse. Returns an error if the message was empty. Note: the method
// does not verify that the reply is a properly encoded QueryResponse protobuf.
func GetLokiSeriesResponseView(data []byte) (view *LokiSeriesResponseView, err error) {
b := codec.NewBuffer(data)
err = molecule.MessageEach(b, func(fieldNum int32, value molecule.Value) (bool, error) {
if fieldNum == seriesResponseFieldNumber {
if len(value.Bytes) > 0 {
// We might be able to avoid an allocation and
// copy here by using value.Bytes
data, err = value.AsBytesSafe()
if err != nil {
return false, fmt.Errorf("could not allocate message bytes: %w", err)
}
view = &LokiSeriesResponseView{buffer: data}
}
}
return true, nil
})
if err == nil && view == nil {
err = fmt.Errorf("loki series response message was empty")
}
return
}
// LokiSeriesResponseView holds the raw bytes of a LokiSeriesResponse protobuf
// message. It is decoded lazily view ForEachSeries.
type LokiSeriesResponseView struct {
buffer []byte
headers []*queryrangebase.PrometheusResponseHeader
}
var _ queryrangebase.Response = &LokiSeriesResponseView{}
func (v *LokiSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
return v.headers
}
// Implement proto.Message
func (v *LokiSeriesResponseView) Reset() {}
func (v *LokiSeriesResponseView) String() string { return "" }
func (v *LokiSeriesResponseView) ProtoMessage() {}
// ForEachSeries iterates of the []logproto.SeriesIdentifier slice and pass a
// view on each identifier to the callback supplied.
func (v *LokiSeriesResponseView) ForEachSeries(fn func(view *SeriesIdentifierView) error) error {
return molecule.MessageEach(codec.NewBuffer(v.buffer), func(fieldNum int32, value molecule.Value) (bool, error) {
if fieldNum == dataFieldNumber {
identifier, err := value.AsBytesUnsafe()
if err != nil {
return false, err
}
view := &SeriesIdentifierView{buffer: identifier}
err = fn(view)
if err != nil {
return false, err
}
}
return true, nil
})
}
// SeriesIdentifierView holds the raw bytes of a logproto.SeriesIdentifier
// protobuf message.
type SeriesIdentifierView struct {
buffer []byte
}
// ForEachLabel iterates over each name-value label pair of the identifier map.
// Note: the strings passed to the supplied callback are unsafe views on the
// underlying data.
func (v *SeriesIdentifierView) ForEachLabel(fn func(string, string) error) error {
pair := make([]string, 0, 2)
return molecule.MessageEach(codec.NewBuffer(v.buffer), func(fieldNum int32, data molecule.Value) (bool, error) {
if fieldNum == 1 {
entry, err := data.AsBytesUnsafe()
if err != nil {
return false, err
}
err = molecule.MessageEach(codec.NewBuffer(entry), func(fieldNum int32, labelOrKey molecule.Value) (bool, error) {
s, err := labelOrKey.AsStringUnsafe()
if err != nil {
return false, err
}
pair = append(pair, s)
return true, nil
})
if err != nil {
return false, err
}
if len(pair) != 2 {
return false, fmt.Errorf("unexpected label pair length, go (%d), want (2)", len(pair))
}
err = fn(pair[0], pair[1])
if err != nil {
return false, err
}
pair = pair[:0]
return true, nil
}
return true, nil
})
}
// This is the separator define in the Prometheus Labels.Hash function.
var sep = string([]byte{'\xff'})
// HashFast is a faster version of the Hash method that uses an unsafe string of
// the name value label pairs. It does not have to allocate strings and is not
// using the separator. Thus it is not equivalent to the original Prometheus
// label hash function.
func (v *SeriesIdentifierView) HashFast(b []byte, keyLabelPairs []string) (uint64, []string, error) {
keyLabelPairs = keyLabelPairs[:0]
err := molecule.MessageEach(codec.NewBuffer(v.buffer), func(fieldNum int32, data molecule.Value) (bool, error) {
if fieldNum == 1 {
entry, err := data.AsStringUnsafe()
if err != nil {
return false, err
}
keyLabelPairs = append(keyLabelPairs, entry)
return true, err
}
return true, nil
})
if err != nil {
return 0, nil, err
}
sort.Strings(keyLabelPairs)
// Use xxhash.Sum64(b) for fast path as it's faster.
b = b[:0]
for i, pair := range keyLabelPairs {
if len(b)+len(pair) >= cap(b) {
// If labels entry is 1KB+ do not allocate whole entry.
h := xxhash.New()
_, _ = h.Write(b)
for _, pair := range keyLabelPairs[i:] {
_, _ = h.WriteString(pair)
}
return h.Sum64(), keyLabelPairs, nil
}
b = append(b, pair...)
}
return xxhash.Sum64(b), keyLabelPairs, nil
}
// Hash is adapted from SeriesIdentifier.Hash and produces the same hash for the
// same input as the original Prometheus hash method.
func (v *SeriesIdentifierView) Hash(b []byte, keyLabelPairs []string) (uint64, []string, error) {
keyLabelPairs = keyLabelPairs[:0]
err := v.ForEachLabel(func(name, value string) error {
pair := name + sep + value + sep
keyLabelPairs = append(keyLabelPairs, pair)
return nil
})
if err != nil {
return 0, nil, err
}
sort.Strings(keyLabelPairs)
// Use xxhash.Sum64(b) for fast path as it's faster.
b = b[:0]
for i, pair := range keyLabelPairs {
if len(b)+len(pair) >= cap(b) {
// If labels entry is 1KB+ do not allocate whole entry.
h := xxhash.New()
_, _ = h.Write(b)
for _, pair := range keyLabelPairs[i:] {
_, _ = h.WriteString(pair)
}
return h.Sum64(), keyLabelPairs, nil
}
b = append(b, pair...)
}
return xxhash.Sum64(b), keyLabelPairs, nil
}
// MergedSeriesResponseView holds references to all series responses that should
// be merged before serialization to JSON. The de-duplication happens during the
// ForEachUniqueSeries iteration.
type MergedSeriesResponseView struct {
responses []*LokiSeriesResponseView
headers []*queryrangebase.PrometheusResponseHeader
}
var _ queryrangebase.Response = &MergedSeriesResponseView{}
func (v *MergedSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
return v.headers
}
// Implement proto.Message
func (v *MergedSeriesResponseView) Reset() {}
func (v *MergedSeriesResponseView) String() string { return "" }
func (v *MergedSeriesResponseView) ProtoMessage() {}
// ForEachUniqueSeries iterates over all unique series identifiers of all series
// responses. It uses the HashFast method before passing the identifier view to
// the supplied callback.
func (v *MergedSeriesResponseView) ForEachUniqueSeries(fn func(*SeriesIdentifierView) error) error {
uniqueSeries := make(map[uint64]struct{})
b := make([]byte, 0, 1024)
keyBuffer := make([]string, 0, 32)
var key uint64
var err error
for _, response := range v.responses {
err = response.ForEachSeries(func(series *SeriesIdentifierView) error {
key, keyBuffer, err = series.HashFast(b, keyBuffer)
if err != nil {
return err
}
if _, duplicate := uniqueSeries[key]; !duplicate {
err = fn(series)
if err != nil {
return err
}
uniqueSeries[key] = struct{}{}
}
return nil
})
if err != nil {
return err
}
}
return nil
}
// Materialize produces a LokiSeriesResponse instance that is a deserialized
// probobuf message.
func (v *MergedSeriesResponseView) Materialize() (*LokiSeriesResponse, error) {
mat := &LokiSeriesResponse{}
err := v.ForEachUniqueSeries(func(series *SeriesIdentifierView) error {
identifier := logproto.SeriesIdentifier{Labels: make(map[string]string)}
err := series.ForEachLabel(func(name, value string) error {
identifier.Labels[name] = value
return nil
})
if err != nil {
return fmt.Errorf("error stepping through labels of series: %w", err)
}
mat.Data = append(mat.Data, identifier)
return nil
})
return mat, err
}
// WriteSeriesResponseViewJSON writes a JSON response to the supplied write that
// is equivalent to marshal.WriteSeriesResponseJSON.
func WriteSeriesResponseViewJSON(v *MergedSeriesResponseView, w io.Writer) error {
s := jsoniter.ConfigFastest.BorrowStream(w)
defer jsoniter.ConfigFastest.ReturnStream(s)
s.WriteObjectStart()
s.WriteObjectField("status")
s.WriteString("success")
s.WriteMore()
s.WriteObjectField("data")
s.WriteArrayStart()
firstSeriesWrite := true
firstLabelWrite := true
err := v.ForEachUniqueSeries(func(id *SeriesIdentifierView) error {
if firstSeriesWrite {
firstSeriesWrite = false
} else {
s.WriteMore()
}
s.WriteObjectStart()
firstLabelWrite = true
err := id.ForEachLabel(func(name, value string) error {
if firstLabelWrite {
firstLabelWrite = false
} else {
s.WriteMore()
}
s.WriteObjectField(name)
s.WriteString(value)
return nil
})
if err != nil {
return err
}
s.WriteObjectEnd()
s.Flush()
return nil
})
if err != nil {
return err
}
s.WriteArrayEnd()
s.WriteObjectEnd()
s.WriteRaw("\n")
return s.Flush()
}

@ -0,0 +1,326 @@
package queryrange
import (
"bytes"
"context"
"io"
"net/http"
"net/url"
"strings"
"testing"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/util/marshal"
)
func TestGetLokiSeriesResponse(t *testing.T) {
p := QueryResponse{
Response: &QueryResponse_Series{
Series: &LokiSeriesResponse{
Status: "success",
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"baz": "woof",
},
},
},
}},
}
buf, err := p.Marshal()
require.NoError(t, err)
view, err := GetLokiSeriesResponseView(buf)
require.NoError(t, err)
actual := make([]string, 0)
err = view.ForEachSeries(func(identifier *SeriesIdentifierView) error {
return identifier.ForEachLabel(func(name, value string) error {
actual = append(actual, name+value)
return nil
})
})
require.NoError(t, err)
require.ElementsMatch(t, actual, []string{"foobar", "bazwoof"})
}
func TestSeriesIdentifierViewHash(t *testing.T) {
identifier := &logproto.SeriesIdentifier{
Labels: map[string]string{
"foo": "bar",
"baz": "woof",
},
}
data, err := identifier.Marshal()
require.NoError(t, err)
view := &SeriesIdentifierView{buffer: data}
b := make([]byte, 0, 1024)
keyLabelPairs := make([]string, 0)
var actual uint64
actual, keyLabelPairs, err = view.Hash(b, keyLabelPairs)
require.NoError(t, err)
require.ElementsMatch(t, keyLabelPairs, []string{"baz\xffwoof\xff", "foo\xffbar\xff"})
expected, _ := identifier.Hash(b, keyLabelPairs)
require.Equal(t, expected, actual)
}
func TestSeriesIdentifierViewForEachLabel(t *testing.T) {
identifier := &logproto.SeriesIdentifier{
Labels: map[string]string{
"foo": "bar",
"baz": "woof",
},
}
data, err := identifier.Marshal()
require.NoError(t, err)
view := &SeriesIdentifierView{buffer: data}
s := make([]string, 0)
err = view.ForEachLabel(func(name, value string) error {
s = append(s, name, value)
return nil
})
require.NoError(t, err)
require.ElementsMatch(t, s, []string{"baz", "woof", "foo", "bar"})
}
func TestSeriesResponseViewForEach(t *testing.T) {
response := &LokiSeriesResponse{
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"i": "1", "baz": "woof"},
},
{
Labels: map[string]string{"i": "2", "foo": "bar"},
},
},
}
data, err := response.Marshal()
require.NoError(t, err)
view := &LokiSeriesResponseView{buffer: data}
actualHashes := make([]uint64, 0)
err = view.ForEachSeries(func(s *SeriesIdentifierView) error {
b := make([]byte, 0, 1024)
keyLabelPairs := make([]string, 0)
hash, _, err := s.Hash(b, keyLabelPairs)
if err != nil {
return err
}
actualHashes = append(actualHashes, hash)
return nil
})
require.NoError(t, err)
expectedHashes := make([]uint64, 0)
for _, id := range response.Data {
b := make([]byte, 0, 1024)
keyLabelPairs := make([]string, 0)
hash, _ := id.Hash(b, keyLabelPairs)
expectedHashes = append(expectedHashes, hash)
}
require.ElementsMatch(t, expectedHashes, actualHashes)
}
func TestMergedViewDeduplication(t *testing.T) {
responses := []*LokiSeriesResponse{
{
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"i": "1", "baz": "woof"},
},
{
Labels: map[string]string{"i": "2", "foo": "bar"},
},
},
},
{
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"i": "3", "baz": "woof"},
},
{
Labels: map[string]string{"i": "2", "foo": "bar"},
},
},
},
}
view := &MergedSeriesResponseView{}
for _, r := range responses {
data, err := r.Marshal()
require.NoError(t, err)
view.responses = append(view.responses, &LokiSeriesResponseView{buffer: data, headers: nil})
}
count := 0
err := view.ForEachUniqueSeries(func(s *SeriesIdentifierView) error {
count++
return nil
})
require.NoError(t, err)
require.Equal(t, 3, count)
}
func TestMergedViewMaterialize(t *testing.T) {
responses := []*LokiSeriesResponse{
{
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"i": "1", "baz": "woof"},
},
{
Labels: map[string]string{"i": "2", "foo": "bar"},
},
},
},
{
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"i": "3", "baz": "woof"},
},
{
Labels: map[string]string{"i": "2", "foo": "bar"},
},
},
},
}
view := &MergedSeriesResponseView{}
for _, r := range responses {
data, err := r.Marshal()
require.NoError(t, err)
view.responses = append(view.responses, &LokiSeriesResponseView{buffer: data, headers: nil})
}
mat, err := view.Materialize()
require.NoError(t, err)
require.Len(t, mat.Data, 3)
series := make([]string, 0)
for _, d := range mat.Data {
series = append(series, labels.FromMap(d.Labels).String())
}
expected := []string{`{baz="woof", i="1"}`, `{baz="woof", i="3"}`, `{foo="bar", i="2"}`}
require.ElementsMatch(t, series, expected)
}
func TestMergedViewJSON(t *testing.T) {
var b strings.Builder
response := &LokiSeriesResponse{
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"i": "1", "baz": "woof"},
},
{
Labels: map[string]string{"i": "2", "foo": "bar"},
},
{
Labels: map[string]string{"i": "3", "baz": "woof"},
},
},
}
view := &MergedSeriesResponseView{}
data, err := response.Marshal()
require.NoError(t, err)
view.responses = append(view.responses, &LokiSeriesResponseView{buffer: data, headers: nil})
err = WriteSeriesResponseViewJSON(view, &b)
require.NoError(t, err)
actual := b.String()
b.Reset()
result := logproto.SeriesResponse{
Series: response.Data,
}
err = marshal.WriteSeriesResponseJSON(result, &b)
require.NoError(t, err)
expected := b.String()
require.JSONEq(t, expected, actual)
}
func Benchmark_DecodeMergeEncodeCycle(b *testing.B) {
// Setup HTTP responses from querier with protobuf encoding.
u := &url.URL{Path: "/loki/api/v1/series"}
httpReq := &http.Request{
Method: "GET",
RequestURI: u.String(),
URL: u,
Header: http.Header{
"Accept": []string{ProtobufType},
},
}
qreq, err := DefaultCodec.DecodeRequest(context.Background(), httpReq, []string{})
require.NoError(b, err)
responses := make([]*LokiSeriesResponse, 100)
for i := range responses {
responses[i] = &LokiSeriesResponse{
Status: "200",
Version: 1,
Statistics: stats.Result{},
Data: generateSeries(),
}
}
httpResponses := make([]*http.Response, 0)
readers := make([]*bytes.Reader, 0)
for _, r := range responses {
resp, err := DefaultCodec.EncodeResponse(context.Background(), httpReq, r)
require.NoError(b, err)
buf, err := io.ReadAll(resp.Body)
require.Nil(b, err)
reader := bytes.NewReader(buf)
resp.Body = io.NopCloser(reader)
readers = append(readers, reader)
httpResponses = append(httpResponses, resp)
}
// Originally the responses were encoded using protobuf. Demand JSON
// now.
httpReq.Header.Del("Accept")
httpReq.Header.Add("Accept", JSONType)
qresps := make([]queryrangebase.Response, 0, 100)
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
// Decode
qresps = qresps[:0]
for i, httpResp := range httpResponses {
_, _ = readers[i].Seek(0, io.SeekStart)
qresp, err := DefaultCodec.DecodeResponse(context.Background(), httpResp, qreq)
require.NoError(b, err)
qresps = append(qresps, qresp)
}
// Merge
result, _ := DefaultCodec.MergeResponse(qresps...)
// Encode
httpRes, err := DefaultCodec.EncodeResponse(context.Background(), httpReq, result)
require.NoError(b, err)
require.Equal(b, "application/json; charset=UTF-8", httpRes.Header.Get("Content-Type"))
}
}

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 Richard Artoul
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,8 @@
gen-proto:
rm -rf ./src/proto/gen
# Use gogo protobuf because the Google library fails very basic marshal/unmarshal fuzz tests
# which makes fuzz testing this library impossible.
docker run --rm -v `pwd`:`pwd` -w `pwd` znly/protoc --proto_path=./src/proto --gofast_out=./src/proto ./src/proto/simple.proto
test:
go test ./...

@ -0,0 +1,44 @@
[![GoDoc](https://godoc.org/github.com/richardartoul/molecule?status.png)](https://godoc.org/github.com/richardartoul/molecule)
[![C.I](https://github.com/richardartoul/molecule/workflows/Go/badge.svg)](https://github.com/richardartoul/molecule/actions)
# Molecule
Molecule is a Go library for parsing protobufs in an efficient and zero-allocation manner. The API is loosely based on [this excellent](https://github.com/buger/jsonparser) Go JSON parsing library.
This library is in alpha and the API could change. The current APIs are fairly low level, but additional helpers may be added in the future to make certain operations more ergonomic.
## Rationale
The standard `Unmarshal` protobuf interface in Go makes it difficult to manually control allocations when parsing protobufs. In addition, its common to only require access to a subset of an individual protobuf's fields. These issues make it hard to use protobuf in performance critical paths.
This library attempts to solve those problems by introducing a streaming, zero-allocation interface that allows users to have complete control over which fields are parsed, and how/when objects are allocated.
The downside, of course, is that `molecule` is more difficult to use (and easier to misuse) than the standard protobuf libraries so its recommended that it only be used in situations where performance is important. It is not a general purpose replacement for `proto.Unmarshal()`. It is recommended that users familiarize themselves with the [proto3 encoding](https://developers.google.com/protocol-buffers/docs/encoding) before attempting to use this library.
## Features
1. Unmarshal all protobuf primitive types with a streaming, zero-allocation API.
2. Support for iterating through protobuf messages in a streaming fashion.
3. Support for iterating through packed protobuf repeated fields (arrays) in a streaming fashion.
## Not Supported
1. Proto2 syntax (some things will probably work, but nothing is tested).
2. Repeated fields encoded not using the "packed" encoding (although in theory they can be parsed using this library, there just aren't any special helpers).
3. Map fields. It *should* be possible to parse maps using this library's API, but it would be a bid tedious. I plan on adding better support for this once I settle on a reasonable API.
4. Probably lots of other things.
## Examples
The [godocs](https://godoc.org/github.com/richardartoul/molecule) have numerous runnable examples.
## Attributions
This library is mostly a thin wrapper around other people's work:
1. The interface was inspired by this [jsonparser](https://github.com/buger/jsonparser) library.
2. The codec for interacting with protobuf streams was lifted from this [protobuf reflection library](https://github.com/jhump/protoreflect). The code was manually vendored instead of imported to reduce dependencies.
## Dependencies
The core `molecule` library has zero external dependencies. The `go.sum` file does contain some dependencies introduced from the tests package, however,
those *should* not be included transitively when using this library.

@ -0,0 +1,45 @@
package molecule
// Package molecule is a Go library for parsing and encoding protobufs in an
// efficient and zero-allocation manner, progressively consuming or creating
// the encoded bytes in a streaming fashion instead of (un)marhsaling full
// structs.
//
// While protobuf is not intended for use in infinite streams, such an
// interface is still useful for rapidly creating protobuf messages from data
// that is not already in easily-marshaled structs. In other words, this
// package can save a substantial amount of copying and allocation over an
// equivalent implementation using more typical struct marshaling.
//
// The drawback is that the implementation requires a more detailed
// understanding of the protobuf encoding, including the field numbers and
// types for the messages being encoded.
//
// ## Decoding
//
// To decode a protobuf message, create a `codec.Buffer` wrapping the encoded
// data, and then use `molecule.MessageEach` to iterate over each field in that
// message.
//
// See the examples for more details.
//
// ## Encoding
//
// Begin by creating a new `ProtoStream` with `New`, passing an `io.Writer`
// to which the output bytes should be written. In many cases this is simply
// a `bytes.Buffer`, but any writer will do.
//
// Next, for each field, call the method appropriate to its protobuf type,
// passing the field number as the first argument. For repeated fields
// containing scalar types, use the `*Packed` methods to write the packed form.
// For non-scalar repeated fields, call the encoding method repeatedly.
//
// This package requires a more detailed understanding of the protobuf encoding
// format than marshaling-based packages. In particular, the message fields
// and their types must be encoded in the source calling each of the methods.
// The field numbers are best encoded as `const` values in the source code.
// For protocol compatibility, such field numbers will never change, so there
// is no difficulty with synchronizing the values in two places. See the
// examples for details.
//
// Note that most methods will do nothing when given their zero value.

@ -0,0 +1,166 @@
package molecule
import (
"fmt"
"github.com/richardartoul/molecule/src/codec"
)
// MessageEachFn is a function that will be called for each top-level field in a
// message passed to MessageEach.
type MessageEachFn func(fieldNum int32, value Value) (bool, error)
// MessageEach iterates over each top-level field in the message stored in buffer
// and calls fn on each one.
func MessageEach(buffer *codec.Buffer, fn MessageEachFn) error {
for !buffer.EOF() {
v, err := buffer.DecodeVarint()
if err != nil {
return err
}
fieldNum, wireType, err := codec.AsTagAndWireType(v)
if err != nil {
return err
}
value := Value{
WireType: wireType,
}
switch wireType {
case codec.WireVarint:
value.Number, err = buffer.DecodeVarint()
case codec.WireFixed32:
value.Number, err = buffer.DecodeFixed32()
case codec.WireFixed64:
value.Number, err = buffer.DecodeFixed64()
case codec.WireBytes:
value.Bytes, err = buffer.DecodeRawBytes(false)
case codec.WireStartGroup, codec.WireEndGroup:
err = fmt.Errorf("MessageEach: encountered group wire type: %d. Groups not supported", wireType)
default:
err = fmt.Errorf("MessageEach: unknown wireType: %d", wireType)
}
if err != nil {
return fmt.Errorf("MessageEach: error reading value from buffer: %v", err)
}
shouldContinue, err := fn(fieldNum, value)
if err != nil || !shouldContinue {
return err
}
}
return nil
}
// Next populates the given value with the next value in the field and returns the field number or an error if one
// was encountered while reading the next field value
func Next(buffer *codec.Buffer, value *Value) (fieldNum int32, err error) {
var v uint64
var wireType codec.WireType
v, err = buffer.DecodeVarint()
if err != nil {
return
}
fieldNum, wireType, err = codec.AsTagAndWireType(v)
if err != nil {
return
}
value.WireType = wireType
switch wireType {
case codec.WireVarint:
value.Number, err = buffer.DecodeVarint()
case codec.WireFixed32:
value.Number, err = buffer.DecodeFixed32()
case codec.WireFixed64:
value.Number, err = buffer.DecodeFixed64()
case codec.WireBytes:
value.Bytes, err = buffer.DecodeRawBytes(false)
case codec.WireStartGroup, codec.WireEndGroup:
err = fmt.Errorf("MessageEach: encountered group wire type: %d. Groups not supported", wireType)
default:
err = fmt.Errorf("MessageEach: unknown wireType: %d", wireType)
}
if err != nil {
err = fmt.Errorf("MessageEach: error reading value from buffer: %v", err)
return
}
return
}
// PackedRepeatedEachFn is a function that is called for each value in a repeated field.
type PackedRepeatedEachFn func(value Value) (bool, error)
// PackedRepeatedEach iterates over each value in the packed repeated field stored in buffer
// and calls fn on each one.
//
// The fieldType argument should match the type of the value stored in the repeated field.
//
// PackedRepeatedEach only supports repeated fields encoded using packed encoding.
func PackedRepeatedEach(buffer *codec.Buffer, fieldType codec.FieldType, fn PackedRepeatedEachFn) error {
var wireType codec.WireType
switch fieldType {
case codec.FieldType_INT32,
codec.FieldType_INT64,
codec.FieldType_UINT32,
codec.FieldType_UINT64,
codec.FieldType_SINT32,
codec.FieldType_SINT64,
codec.FieldType_BOOL,
codec.FieldType_ENUM:
wireType = codec.WireVarint
case codec.FieldType_FIXED64,
codec.FieldType_SFIXED64,
codec.FieldType_DOUBLE:
wireType = codec.WireFixed64
case codec.FieldType_FIXED32,
codec.FieldType_SFIXED32,
codec.FieldType_FLOAT:
wireType = codec.WireFixed32
case codec.FieldType_STRING,
codec.FieldType_MESSAGE,
codec.FieldType_BYTES:
wireType = codec.WireBytes
default:
return fmt.Errorf(
"PackedRepeatedEach: unknown field type: %v", fieldType)
}
for !buffer.EOF() {
var err error
value := Value{
WireType: wireType,
}
switch wireType {
case codec.WireVarint:
value.Number, err = buffer.DecodeVarint()
case codec.WireFixed32:
value.Number, err = buffer.DecodeFixed32()
case codec.WireFixed64:
value.Number, err = buffer.DecodeFixed64()
case codec.WireBytes:
value.Bytes, err = buffer.DecodeRawBytes(false)
case codec.WireStartGroup, codec.WireEndGroup:
err = fmt.Errorf("PackedRepeatedEach: encountered group wire type: %d. Groups not supported", wireType)
default:
err = fmt.Errorf("PackedRepeatedEach: unknown wireType: %d", wireType)
}
if err != nil {
return fmt.Errorf("PackedRepeatedEach: error reading value from buffer: %v", err)
}
if shouldContinue, err := fn(value); err != nil || !shouldContinue {
return err
}
}
return nil
}

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@ -0,0 +1,5 @@
# Codec
This package is a modified version of code found in: https://github.com/jhump/protoreflect
The LICENSE file in this folder is the license for the original source code.

@ -0,0 +1,84 @@
// This file contains modifications from the original source code found in: https://github.com/jhump/protoreflect
package codec
import (
"fmt"
"io"
)
// Buffer is a reader and a writer that wraps a slice of bytes and also
// provides API for decoding and encoding the protobuf binary format.
//
// Its operation is similar to that of a bytes.Buffer: writing pushes
// data to the end of the buffer while reading pops data from the head
// of the buffer. So the same buffer can be used to both read and write.
type Buffer struct {
buf []byte
index int
len int
}
// NewBuffer creates a new buffer with the given slice of bytes as the
// buffer's initial contents.
func NewBuffer(buf []byte) *Buffer {
return &Buffer{buf: buf, index: 0, len: len(buf)}
}
// Reset resets this buffer back to empty. Any subsequent writes/encodes
// to the buffer will allocate a new backing slice of bytes.
func (cb *Buffer) Reset(buf []byte) {
cb.buf = buf
cb.index = 0
cb.len = len(buf)
}
// Bytes returns the slice of bytes remaining in the buffer. Note that
// this does not perform a copy: if the contents of the returned slice
// are modified, the modifications will be visible to subsequent reads
// via the buffer.
func (cb *Buffer) Bytes() []byte {
return cb.buf[cb.index:]
}
// EOF returns true if there are no more bytes remaining to read.
func (cb *Buffer) EOF() bool {
return cb.index >= cb.len
}
// Skip attempts to skip the given number of bytes in the input. If
// the input has fewer bytes than the given count, false is returned
// and the buffer is unchanged. Otherwise, the given number of bytes
// are skipped and true is returned.
func (cb *Buffer) Skip(count int) error {
if count < 0 {
return fmt.Errorf("proto: bad byte length %d", count)
}
newIndex := cb.index + count
if newIndex < cb.index || newIndex > cb.len {
return io.ErrUnexpectedEOF
}
cb.index = newIndex
return nil
}
// Len returns the remaining number of bytes in the buffer.
func (cb *Buffer) Len() int {
return cb.len - cb.index
}
// Read implements the io.Reader interface. If there are no bytes
// remaining in the buffer, it will return 0, io.EOF. Otherwise,
// it reads max(len(dest), cb.Len()) bytes from input and copies
// them into dest. It returns the number of bytes copied and a nil
// error in this case.
func (cb *Buffer) Read(dest []byte) (int, error) {
if cb.index == cb.len {
return 0, io.EOF
}
copied := copy(dest, cb.buf[cb.index:])
cb.index += copied
return copied, nil
}
var _ io.Reader = (*Buffer)(nil)

@ -0,0 +1,42 @@
// This file contains modifications from the original source code found in: https://github.com/jhump/protoreflect
// Package codec contains all the logic required for interacting with the protobuf raw encoding. It also contains
// all the encoding and field type specific constants required for using the molecule library.
package codec
// WireType represents a protobuf encoding wire type.
type WireType int8
// Constants that identify the encoding of a value on the wire.
const (
WireVarint WireType = 0
WireFixed64 WireType = 1
WireBytes WireType = 2
WireStartGroup WireType = 3
WireEndGroup WireType = 4
WireFixed32 WireType = 5
)
// FieldType represents a protobuf field type.
type FieldType int32
const (
FieldType_DOUBLE FieldType = 1
FieldType_FLOAT FieldType = 2
FieldType_INT64 FieldType = 3
FieldType_UINT64 FieldType = 4
FieldType_INT32 FieldType = 5
FieldType_FIXED64 FieldType = 6
FieldType_FIXED32 FieldType = 7
FieldType_BOOL FieldType = 8
FieldType_STRING FieldType = 9
FieldType_GROUP FieldType = 10
FieldType_MESSAGE FieldType = 11
FieldType_BYTES FieldType = 12
FieldType_UINT32 FieldType = 13
FieldType_ENUM FieldType = 14
FieldType_SFIXED32 FieldType = 15
FieldType_SFIXED64 FieldType = 16
FieldType_SINT32 FieldType = 17
FieldType_SINT64 FieldType = 18
)

@ -0,0 +1,482 @@
// This file contains modifications from the original source code found in: https://github.com/jhump/protoreflect
package codec
import (
"errors"
"fmt"
"io"
)
// ErrOverflow is returned when an integer is too large to be represented.
var ErrOverflow = errors.New("proto: integer overflow")
// ErrBadWireType is returned when decoding a wire-type from a buffer that
// is not valid.
var ErrBadWireType = errors.New("proto: bad wiretype")
var varintTypes = map[FieldType]bool{}
var fixed32Types = map[FieldType]bool{}
var fixed64Types = map[FieldType]bool{}
func init() {
varintTypes[FieldType_BOOL] = true
varintTypes[FieldType_INT32] = true
varintTypes[FieldType_INT64] = true
varintTypes[FieldType_UINT32] = true
varintTypes[FieldType_UINT64] = true
varintTypes[FieldType_SINT32] = true
varintTypes[FieldType_SINT64] = true
varintTypes[FieldType_ENUM] = true
fixed32Types[FieldType_FIXED32] = true
fixed32Types[FieldType_SFIXED32] = true
fixed32Types[FieldType_FLOAT] = true
fixed64Types[FieldType_FIXED64] = true
fixed64Types[FieldType_SFIXED64] = true
fixed64Types[FieldType_DOUBLE] = true
}
// DecodeVarint reads a varint-encoded integer from the Buffer.
// This is the format for the
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
//
// This implementation is inlined from https://github.com/dennwc/varint to avoid the call-site overhead
func (cb *Buffer) DecodeVarint() (uint64, error) {
if cb.Len() == 0 {
return 0, io.ErrUnexpectedEOF
}
const (
step = 7
bit = 1 << 7
mask = bit - 1
)
if cb.Len() >= 10 {
// i == 0
b := cb.buf[cb.index]
if b < bit {
cb.index++
return uint64(b), nil
}
x := uint64(b & mask)
var s uint = step
// i == 1
b = cb.buf[cb.index+1]
if b < bit {
cb.index += 2
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 2
b = cb.buf[cb.index+2]
if b < bit {
cb.index += 3
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 3
b = cb.buf[cb.index+3]
if b < bit {
cb.index += 4
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 4
b = cb.buf[cb.index+4]
if b < bit {
cb.index += 5
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 5
b = cb.buf[cb.index+5]
if b < bit {
cb.index += 6
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 6
b = cb.buf[cb.index+6]
if b < bit {
cb.index += 7
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 7
b = cb.buf[cb.index+7]
if b < bit {
cb.index += 8
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 8
b = cb.buf[cb.index+8]
if b < bit {
cb.index += 9
return x | uint64(b)<<s, nil
}
x |= uint64(b&mask) << s
s += step
// i == 9
b = cb.buf[cb.index+9]
if b < bit {
if b > 1 {
return 0, ErrOverflow
}
cb.index += 10
return x | uint64(b)<<s, nil
} else if cb.Len() == 10 {
return 0, io.ErrUnexpectedEOF
}
for _, b := range cb.buf[cb.index+10:] {
if b < bit {
return 0, ErrOverflow
}
}
return 0, io.ErrUnexpectedEOF
}
// i == 0
b := cb.buf[cb.index]
if b < bit {
cb.index++
return uint64(b), nil
} else if cb.Len() == 1 {
return 0, io.ErrUnexpectedEOF
}
x := uint64(b & mask)
var s uint = step
// i == 1
b = cb.buf[cb.index+1]
if b < bit {
cb.index += 2
return x | uint64(b)<<s, nil
} else if cb.Len() == 2 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 2
b = cb.buf[cb.index+2]
if b < bit {
cb.index += 3
return x | uint64(b)<<s, nil
} else if cb.Len() == 3 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 3
b = cb.buf[cb.index+3]
if b < bit {
cb.index += 4
return x | uint64(b)<<s, nil
} else if cb.Len() == 4 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 4
b = cb.buf[cb.index+4]
if b < bit {
cb.index += 5
return x | uint64(b)<<s, nil
} else if cb.Len() == 5 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 5
b = cb.buf[cb.index+5]
if b < bit {
cb.index += 6
return x | uint64(b)<<s, nil
} else if cb.Len() == 6 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 6
b = cb.buf[cb.index+6]
if b < bit {
cb.index += 7
return x | uint64(b)<<s, nil
} else if cb.Len() == 7 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 7
b = cb.buf[cb.index+7]
if b < bit {
cb.index += 8
return x | uint64(b)<<s, nil
} else if cb.Len() == 8 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 8
b = cb.buf[cb.index+8]
if b < bit {
cb.index += 9
return x | uint64(b)<<s, nil
} else if cb.Len() == 9 {
return 0, io.ErrUnexpectedEOF
}
x |= uint64(b&mask) << s
s += step
// i == 9
b = cb.buf[cb.index+9]
if b < bit {
if b > 1 {
return 0, ErrOverflow
}
cb.index += 10
return x | uint64(b)<<s, nil
} else if cb.Len() == 10 {
return 0, io.ErrUnexpectedEOF
}
for _, b := range cb.buf[cb.index+10:] {
if b < bit {
return 0, ErrOverflow
}
}
return 0, io.ErrUnexpectedEOF
}
// DecodeFixed64 reads a 64-bit integer from the Buffer.
// This is the format for the
// fixed64, sfixed64, and double protocol buffer types.
func (cb *Buffer) DecodeFixed64() (x uint64, err error) {
// x, err already 0
i := cb.index + 8
if i < 0 || i > cb.len {
err = io.ErrUnexpectedEOF
return
}
cb.index = i
x = uint64(cb.buf[i-8])
x |= uint64(cb.buf[i-7]) << 8
x |= uint64(cb.buf[i-6]) << 16
x |= uint64(cb.buf[i-5]) << 24
x |= uint64(cb.buf[i-4]) << 32
x |= uint64(cb.buf[i-3]) << 40
x |= uint64(cb.buf[i-2]) << 48
x |= uint64(cb.buf[i-1]) << 56
return
}
// DecodeFixed32 reads a 32-bit integer from the Buffer.
// This is the format for the
// fixed32, sfixed32, and float protocol buffer types.
func (cb *Buffer) DecodeFixed32() (x uint64, err error) {
// x, err already 0
i := cb.index + 4
if i < 0 || i > cb.len {
err = io.ErrUnexpectedEOF
return
}
cb.index = i
x = uint64(cb.buf[i-4])
x |= uint64(cb.buf[i-3]) << 8
x |= uint64(cb.buf[i-2]) << 16
x |= uint64(cb.buf[i-1]) << 24
return
}
// DecodeZigZag32 decodes a signed 32-bit integer from the given
// zig-zag encoded value.
func DecodeZigZag32(v uint64) int32 {
return int32((uint32(v) >> 1) ^ uint32((int32(v&1)<<31)>>31))
}
// DecodeZigZag64 decodes a signed 64-bit integer from the given
// zig-zag encoded value.
func DecodeZigZag64(v uint64) int64 {
return int64((v >> 1) ^ uint64((int64(v&1)<<63)>>63))
}
// DecodeRawBytes reads a count-delimited byte buffer from the Buffer.
// This is the format used for the bytes protocol buffer
// type and for embedded messages.
func (cb *Buffer) DecodeRawBytes(alloc bool) (buf []byte, err error) {
n, err := cb.DecodeVarint()
if err != nil {
return nil, err
}
nb := int(n)
if nb < 0 {
return nil, fmt.Errorf("proto: bad byte length %d", nb)
}
end := cb.index + nb
if end < cb.index || end > cb.len {
return nil, io.ErrUnexpectedEOF
}
if !alloc {
// We set a cap on the returned slice equal to the length of the buffer so that it is not possible
// to read past the end of this slice
buf = cb.buf[cb.index:end:end]
cb.index = end
return
}
buf = make([]byte, nb)
copy(buf, cb.buf[cb.index:])
cb.index = end
return
}
// ReadGroup reads the input until a "group end" tag is found
// and returns the data up to that point. Subsequent reads from
// the buffer will read data after the group end tag. If alloc
// is true, the data is copied to a new slice before being returned.
// Otherwise, the returned slice is a view into the buffer's
// underlying byte slice.
//
// This function correctly handles nested groups: if a "group start"
// tag is found, then that group's end tag will be included in the
// returned data.
func (cb *Buffer) ReadGroup(alloc bool) ([]byte, error) {
var groupEnd, dataEnd int
groupEnd, dataEnd, err := cb.findGroupEnd()
if err != nil {
return nil, err
}
var results []byte
if !alloc {
results = cb.buf[cb.index:dataEnd]
} else {
results = make([]byte, dataEnd-cb.index)
copy(results, cb.buf[cb.index:])
}
cb.index = groupEnd
return results, nil
}
// SkipGroup is like ReadGroup, except that it discards the
// data and just advances the buffer to point to the input
// right *after* the "group end" tag.
func (cb *Buffer) SkipGroup() error {
groupEnd, _, err := cb.findGroupEnd()
if err != nil {
return err
}
cb.index = groupEnd
return nil
}
func (cb *Buffer) findGroupEnd() (groupEnd int, dataEnd int, err error) {
bs := cb.buf
start := cb.index
defer func() {
cb.index = start
}()
for {
fieldStart := cb.index
// read a field tag
var v uint64
v, err = cb.DecodeVarint()
if err != nil {
return 0, 0, err
}
_, wireType, err := AsTagAndWireType(v)
if err != nil {
return 0, 0, err
}
// skip past the field's data
switch wireType {
case WireFixed32:
if err := cb.Skip(4); err != nil {
return 0, 0, err
}
case WireFixed64:
if err := cb.Skip(8); err != nil {
return 0, 0, err
}
case WireVarint:
// skip varint by finding last byte (has high bit unset)
i := cb.index
limit := i + 10 // varint cannot be >10 bytes
for {
if i >= limit {
return 0, 0, ErrOverflow
}
if i >= len(bs) {
return 0, 0, io.ErrUnexpectedEOF
}
if bs[i]&0x80 == 0 {
break
}
i++
}
// TODO: This would only overflow if buffer length was MaxInt and we
// read the last byte. This is not a real/feasible concern on 64-bit
// systems. Something to worry about for 32-bit systems? Do we care?
cb.index = i + 1
case WireBytes:
l, err := cb.DecodeVarint()
if err != nil {
return 0, 0, err
}
if err := cb.Skip(int(l)); err != nil {
return 0, 0, err
}
case WireStartGroup:
if err := cb.SkipGroup(); err != nil {
return 0, 0, err
}
case WireEndGroup:
return cb.index, fieldStart, nil
default:
return 0, 0, ErrBadWireType
}
}
}
// AsTagAndWireType converts the given varint in to a field number and wireType
//
// As of now, this function is inlined. Please double check that any modifications do not modify the
// inline eligibility
func AsTagAndWireType(v uint64) (tag int32, wireType WireType, err error) {
// rest is int32 tag number
// low 7 bits is wire type
wireType = WireType(v & 7)
tag = int32(v >> 3)
if tag <= 0 {
err = ErrBadWireType // We return a constant error here as this allows the function to be inlined
}
return
}

@ -0,0 +1,27 @@
Copyright (c) 2018 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,137 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package protowire parses and formats the raw wire encoding.
// See https://developers.google.com/protocol-buffers/docs/encoding.
//
// For marshaling and unmarshaling entire protobuf messages,
// use the "google.golang.org/protobuf/proto" package instead.
package protowire
// This file has been modified from the original:
//
// * remove items not needed for Molecule (methods, constants)
// * remove anything those depended on
// Type represents the wire type.
type Type int8
const (
VarintType Type = 0
Fixed32Type Type = 5
Fixed64Type Type = 1
BytesType Type = 2
StartGroupType Type = 3
EndGroupType Type = 4
)
// AppendVarint appends v to b as a varint-encoded uint64.
func AppendVarint(b []byte, v uint64) []byte {
switch {
case v < 1<<7:
b = append(b, byte(v))
case v < 1<<14:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte(v>>7))
case v < 1<<21:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte(v>>14))
case v < 1<<28:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte((v>>14)&0x7f|0x80),
byte(v>>21))
case v < 1<<35:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte((v>>14)&0x7f|0x80),
byte((v>>21)&0x7f|0x80),
byte(v>>28))
case v < 1<<42:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte((v>>14)&0x7f|0x80),
byte((v>>21)&0x7f|0x80),
byte((v>>28)&0x7f|0x80),
byte(v>>35))
case v < 1<<49:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte((v>>14)&0x7f|0x80),
byte((v>>21)&0x7f|0x80),
byte((v>>28)&0x7f|0x80),
byte((v>>35)&0x7f|0x80),
byte(v>>42))
case v < 1<<56:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte((v>>14)&0x7f|0x80),
byte((v>>21)&0x7f|0x80),
byte((v>>28)&0x7f|0x80),
byte((v>>35)&0x7f|0x80),
byte((v>>42)&0x7f|0x80),
byte(v>>49))
case v < 1<<63:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte((v>>14)&0x7f|0x80),
byte((v>>21)&0x7f|0x80),
byte((v>>28)&0x7f|0x80),
byte((v>>35)&0x7f|0x80),
byte((v>>42)&0x7f|0x80),
byte((v>>49)&0x7f|0x80),
byte(v>>56))
default:
b = append(b,
byte((v>>0)&0x7f|0x80),
byte((v>>7)&0x7f|0x80),
byte((v>>14)&0x7f|0x80),
byte((v>>21)&0x7f|0x80),
byte((v>>28)&0x7f|0x80),
byte((v>>35)&0x7f|0x80),
byte((v>>42)&0x7f|0x80),
byte((v>>49)&0x7f|0x80),
byte((v>>56)&0x7f|0x80),
1)
}
return b
}
// AppendFixed32 appends v to b as a little-endian uint32.
func AppendFixed32(b []byte, v uint32) []byte {
return append(b,
byte(v>>0),
byte(v>>8),
byte(v>>16),
byte(v>>24))
}
// AppendFixed64 appends v to b as a little-endian uint64.
func AppendFixed64(b []byte, v uint64) []byte {
return append(b,
byte(v>>0),
byte(v>>8),
byte(v>>16),
byte(v>>24),
byte(v>>32),
byte(v>>40),
byte(v>>48),
byte(v>>56))
}
// EncodeZigZag encodes an int64 as a zig-zag-encoded uint64.
// Input: {…, -3, -2, -1, 0, +1, +2, +3, …}
// Output: {…, 5, 3, 1, 0, 2, 4, 6, …}
func EncodeZigZag(x int64) uint64 {
return uint64(x<<1) ^ uint64(x>>63)
}

@ -0,0 +1,506 @@
package molecule
import (
"bytes"
"io"
"math"
"github.com/richardartoul/molecule/src/protowire"
)
const (
// The defaultBufferSize is the default size for buffers used for embedded
// values, which must first be written to a buffer to determine their
// length. This is not used if BufferFactory is set.
defaultBufferSize int = 1024 * 8
)
// A ProtoStream supports writing protobuf data in a streaming fashion. Its methods
// will write their output to the wrapped `io.Writer`. Zero values are not included.
//
// ProtoStream instances are *not* threadsafe and *not* re-entrant.
type ProtoStream struct {
// The outputWriter is the writer to which the protobuf-encoded bytes are
// written.
outputWriter io.Writer
// The scratchBuffer is a buffer used and re-used for generating output.
// Each method should begin by resetting this buffer.
scratchBuffer []byte
// The scratchArray is a second, very small array used for packed
// encodings. It is large enough to fit two max-size varints (10 bytes
// each) without reallocation
scratchArray [20]byte
// The childStream is a ProtoStream used to implement `Embedded`, and
// reused for multiple calls.
childStream *ProtoStream
// The childBuffer is the buffer to which `childStream` writes.
childBuffer *bytes.Buffer
// The BufferFactory creates new, empty buffers as needed. Users may
// override this function to provide pre-initialized buffers of a larger
// size, or from a buffer pool, for example.
BufferFactory func() []byte
}
// NewProtoStream creates a new ProtoStream writing to the given Writer. If the
// writer is nil, the stream cannot be used until it has been set with `Reset`.
func NewProtoStream(outputWriter io.Writer) *ProtoStream {
return &ProtoStream{
scratchBuffer: make([]byte, 0, defaultBufferSize),
childStream: nil,
childBuffer: nil,
outputWriter: outputWriter,
BufferFactory: func() []byte { return make([]byte, 0, defaultBufferSize) },
}
}
// Reset sets the Writer to which this ProtoStream streams. If the writer is nil,
// then the protostream cannot be used until Reset is called with a non-nil value.
func (ps *ProtoStream) Reset(outputWriter io.Writer) {
ps.outputWriter = outputWriter
ps.scratchBuffer = ps.scratchBuffer[:0]
}
// Double writes a value of proto type double to the stream.
func (ps *ProtoStream) Double(fieldNumber int, value float64) error {
if value == 0.0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.Fixed64Type)
ps.scratchBuffer = protowire.AppendFixed64(ps.scratchBuffer, math.Float64bits(value))
return ps.writeScratch()
}
// DoublePacked writes a slice of values of proto type double to the stream,
// in packed form.
func (ps *ProtoStream) DoublePacked(fieldNumber int, values []float64) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendFixed64(ps.scratchBuffer, math.Float64bits(value))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Float writes a value of proto type double to the stream.
func (ps *ProtoStream) Float(fieldNumber int, value float32) error {
if value == 0.0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.Fixed32Type)
ps.scratchBuffer = protowire.AppendFixed32(ps.scratchBuffer, math.Float32bits(value))
return ps.writeScratch()
}
// FloatPacked writes a slice of values of proto type float to the stream,
// in packed form.
func (ps *ProtoStream) FloatPacked(fieldNumber int, values []float32) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendFixed32(ps.scratchBuffer, math.Float32bits(value))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Int32 writes a value of proto type int32 to the stream.
func (ps *ProtoStream) Int32(fieldNumber int, value int32) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.VarintType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(value))
return ps.writeScratch()
}
// Int32Packed writes a slice of values of proto type int32 to the stream,
// in packed form.
func (ps *ProtoStream) Int32Packed(fieldNumber int, values []int32) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(value))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Int64 writes a value of proto type int64 to the stream.
func (ps *ProtoStream) Int64(fieldNumber int, value int64) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.VarintType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(value))
return ps.writeScratch()
}
// Int64Packed writes a slice of values of proto type int64 to the stream,
// in packed form.
func (ps *ProtoStream) Int64Packed(fieldNumber int, values []int64) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(value))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Uint32 writes a value of proto type uint32 to the stream.
func (ps *ProtoStream) Uint32(fieldNumber int, value uint32) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.VarintType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(value))
return ps.writeScratch()
}
// Uint32Packed writes a slice of values of proto type uint32 to the stream,
// in packed form.
func (ps *ProtoStream) Uint32Packed(fieldNumber int, values []uint32) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(value))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Uint64 writes a value of proto type uint64 to the stream.
func (ps *ProtoStream) Uint64(fieldNumber int, value uint64) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.VarintType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, value)
return ps.writeScratch()
}
// Uint64Packed writes a slice of values of proto type uint64 to the stream,
// in packed form.
func (ps *ProtoStream) Uint64Packed(fieldNumber int, values []uint64) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, value)
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Sint32 writes a value of proto type sint32 to the stream.
func (ps *ProtoStream) Sint32(fieldNumber int, value int32) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.VarintType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, protowire.EncodeZigZag(int64(value)))
return ps.writeScratch()
}
// Sint32Packed writes a slice of values of proto type sint32 to the stream,
// in packed form.
func (ps *ProtoStream) Sint32Packed(fieldNumber int, values []int32) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, protowire.EncodeZigZag(int64(value)))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Sint64 writes a value of proto type sint64 to the stream.
func (ps *ProtoStream) Sint64(fieldNumber int, value int64) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.VarintType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, zigzag64(uint64(value)))
return ps.writeScratch()
}
// Sint64Packed writes a slice of values of proto type sint64 to the stream,
// in packed form.
func (ps *ProtoStream) Sint64Packed(fieldNumber int, values []int64) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, zigzag64(uint64(value)))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Fixed32 writes a value of proto type fixed32 to the stream.
func (ps *ProtoStream) Fixed32(fieldNumber int, value uint32) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.Fixed32Type)
ps.scratchBuffer = protowire.AppendFixed32(ps.scratchBuffer, value)
return ps.writeScratch()
}
// Fixed32Packed writes a slice of values of proto type fixed32 to the stream,
// in packed form.
func (ps *ProtoStream) Fixed32Packed(fieldNumber int, values []uint32) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendFixed32(ps.scratchBuffer, value)
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Fixed64 writes a value of proto type fixed64 to the stream.
func (ps *ProtoStream) Fixed64(fieldNumber int, value uint64) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.Fixed64Type)
ps.scratchBuffer = protowire.AppendFixed64(ps.scratchBuffer, value)
return ps.writeScratch()
}
// Fixed64Packed writes a slice of values of proto type fixed64 to the stream,
// in packed form.
func (ps *ProtoStream) Fixed64Packed(fieldNumber int, values []uint64) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendFixed64(ps.scratchBuffer, value)
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Sfixed32 writes a value of proto type sfixed32 to the stream.
func (ps *ProtoStream) Sfixed32(fieldNumber int, value int32) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.Fixed32Type)
ps.scratchBuffer = protowire.AppendFixed32(ps.scratchBuffer, uint32(value))
return ps.writeScratch()
}
// Sfixed32Packed writes a slice of values of proto type sfixed32 to the stream,
// in packed form.
func (ps *ProtoStream) Sfixed32Packed(fieldNumber int, values []int32) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendFixed32(ps.scratchBuffer, uint32(value))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Sfixed64 writes a value of proto type sfixed64 to the stream.
func (ps *ProtoStream) Sfixed64(fieldNumber int, value int64) error {
if value == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.Fixed64Type)
ps.scratchBuffer = protowire.AppendFixed64(ps.scratchBuffer, uint64(value))
return ps.writeScratch()
}
// Sfixed64Packed writes a slice of values of proto type sfixed64 to the stream,
// in packed form.
func (ps *ProtoStream) Sfixed64Packed(fieldNumber int, values []int64) error {
if len(values) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
for _, value := range values {
ps.scratchBuffer = protowire.AppendFixed64(ps.scratchBuffer, uint64(value))
}
return ps.writeScratchAsPacked(fieldNumber)
}
// Bool writes a value of proto type bool to the stream.
func (ps *ProtoStream) Bool(fieldNumber int, value bool) error {
if value == false {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.VarintType)
var bit uint64
if value {
bit = 1
}
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, bit)
return ps.writeScratch()
}
// String writes a string to the stream.
func (ps *ProtoStream) String(fieldNumber int, value string) error {
if len(value) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.BytesType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(len(value)))
err := ps.writeScratch()
if err != nil {
return err
}
return ps.writeAllString(value)
}
// Bytes writes the given bytes to the stream.
func (ps *ProtoStream) Bytes(fieldNumber int, value []byte) error {
if len(value) == 0 {
return nil
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.BytesType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(len(value)))
err := ps.writeScratch()
if err != nil {
return err
}
return ps.writeAll(value)
}
// Embedded is used for constructing embedded messages. It calls the given
// function with a new ProtoStream, then embeds the result in the current
// stream.
//
// NOTE: if the inner function creates an empty message (such as for a struct
// at its zero value), that empty message will still be added to the stream.
func (ps *ProtoStream) Embedded(fieldNumber int, inner func(*ProtoStream) error) error {
// Create a new child, writing to a buffer, if one does not already exist.
if ps.childStream == nil {
ps.childBuffer = bytes.NewBuffer(ps.BufferFactory())
ps.childStream = NewProtoStream(ps.childBuffer)
}
// Write the embedded value using the child, leaving the result in ps.childBuffer.
ps.childBuffer.Reset()
err := inner(ps.childStream)
if err != nil {
return err
}
ps.scratchBuffer = ps.scratchBuffer[:0]
ps.encodeKeyToScratch(fieldNumber, protowire.BytesType)
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(ps.childBuffer.Len()))
// Write the key and length prefix.
err = ps.writeScratch()
if err != nil {
return err
}
// Write out the embedded message.
return ps.writeAll(ps.childBuffer.Bytes())
}
// writeScratch flushes the scratch buffer to output.
func (ps *ProtoStream) writeScratch() error {
return ps.writeAll(ps.scratchBuffer)
}
// writeScratchAsPacked writes the scratch buffer to outputWriter, prefixed with
// the given key and the length of the scratch buffer. This is used for packed
// encodings.
func (ps *ProtoStream) writeScratchAsPacked(fieldNumber int) error {
// The scratch buffer is full of the packed data, but we need to write
// the key and size, so we use scratchArray. We could use a stack allocation
// here, but as of writing the go compiler is not smart enough to figure out
// that the value does not escape.
keysize := ps.scratchArray[:0]
keysize = protowire.AppendVarint(keysize, uint64((fieldNumber<<3)|int(protowire.BytesType)))
keysize = protowire.AppendVarint(keysize, uint64(len(ps.scratchBuffer)))
// Write the key and length prefix.
err := ps.writeAll(keysize)
if err != nil {
return err
}
// Write out the embedded message.
err = ps.writeScratch()
if err != nil {
return err
}
return nil
}
// writeAll writes an entire buffer to output.
func (ps *ProtoStream) writeAll(buf []byte) error {
for len(buf) > 0 {
n, err := ps.outputWriter.Write(buf)
if err != nil {
return err
}
buf = buf[n:]
}
return nil
}
// writeAllString writes an entire string to output, using io.WriteString
// to avoid allocation.
func (ps *ProtoStream) writeAllString(value string) error {
for len(value) > 0 {
n, err := io.WriteString(ps.outputWriter, value)
if err != nil {
return err
}
value = value[n:]
}
return nil
}
// encodeKeyToScratch encodes a protobuf key into ps.scratch.
func (ps *ProtoStream) encodeKeyToScratch(fieldNumber int, wireType protowire.Type) {
ps.scratchBuffer = protowire.AppendVarint(ps.scratchBuffer, uint64(fieldNumber)<<3+uint64(wireType))
}
func zigzag32(v uint64) uint64 {
return uint64((uint32(v) << 1) ^ uint32((int32(v) >> 31)))
}
func zigzag64(v uint64) uint64 {
return (v << 1) ^ uint64((int64(v) >> 63))
}

@ -0,0 +1,154 @@
package molecule
import (
"fmt"
"math"
"reflect"
"unsafe"
"github.com/richardartoul/molecule/src/codec"
)
// Value represents a protobuf value. It contains the original wiretype that the value
// was encoded with as well as a variety of helper methods for interpreting the raw
// value based on the field's actual type.
type Value struct {
// WireType is the protobuf wire type that was used to encode the field.
WireType codec.WireType
// Number will contain the value for any fields encoded with the
// following wire types:
//
// 1. varint
// 2. Fixed32
// 3. Fixed64
Number uint64
// Bytes will contain the value for any fields encoded with the
// following wire types:
//
// 1. bytes
//
// Bytes is an unsafe view over the bytes in the buffer. To obtain a "safe" copy
// call value.AsSafeBytes() or copy Bytes directly.
Bytes []byte
}
// AsDouble interprets the value as a double.
func (v *Value) AsDouble() (float64, error) {
return math.Float64frombits(v.Number), nil
}
// AsFloat interprets the value as a float.
func (v *Value) AsFloat() (float32, error) {
if v.Number > math.MaxUint32 {
return 0, fmt.Errorf("AsFloat: %d overflows float32", v.Number)
}
return math.Float32frombits(uint32(v.Number)), nil
}
// AsInt32 interprets the value as an int32.
func (v *Value) AsInt32() (int32, error) {
s := int64(v.Number)
if s > math.MaxInt32 {
return 0, fmt.Errorf("AsInt32: %d overflows int32", s)
}
if s < math.MinInt32 {
return 0, fmt.Errorf("AsInt32: %d underflows int32", s)
}
return int32(v.Number), nil
}
// AsInt64 interprets the value as an int64.
func (v *Value) AsInt64() (int64, error) {
return int64(v.Number), nil
}
// AsUint32 interprets the value as a uint32.
func (v *Value) AsUint32() (uint32, error) {
if v.Number > math.MaxUint32 {
return 0, fmt.Errorf("AsUInt32: %d overflows uint32", v.Number)
}
return uint32(v.Number), nil
}
// AsUint64 interprets the value as a uint64.
func (v *Value) AsUint64() (uint64, error) {
return v.Number, nil
}
// AsSint32 interprets the value as a sint32.
func (v *Value) AsSint32() (int32, error) {
if v.Number > math.MaxUint32 {
return 0, fmt.Errorf("AsSint32: %d overflows int32", v.Number)
}
return codec.DecodeZigZag32(v.Number), nil
}
// AsSint64 interprets the value as a sint64.
func (v *Value) AsSint64() (int64, error) {
return codec.DecodeZigZag64(v.Number), nil
}
// AsFixed32 interprets the value as a fixed32.
func (v *Value) AsFixed32() (uint32, error) {
if v.Number > math.MaxUint32 {
return 0, fmt.Errorf("AsFixed32: %d overflows int32", v.Number)
}
return uint32(v.Number), nil
}
// AsFixed64 interprets the value as a fixed64.
func (v *Value) AsFixed64() (uint64, error) {
return uint64(v.Number), nil
}
// AsSFixed32 interprets the value as a SFixed32.
func (v *Value) AsSFixed32() (int32, error) {
if v.Number > math.MaxUint32 {
return 0, fmt.Errorf("AsSFixed32: %d overflows int32", v.Number)
}
return int32(v.Number), nil
}
// AsSFixed64 interprets the value as a SFixed64.
func (v *Value) AsSFixed64() (int64, error) {
return int64(v.Number), nil
}
// AsBool interprets the value as a bool.
func (v *Value) AsBool() (bool, error) {
return v.Number == 1, nil
}
// AsStringUnsafe interprets the value as a string. The returned string is an unsafe view over
// the underlying bytes. Use AsStringSafe() to obtain a "safe" string that is a copy of the
// underlying data.
func (v *Value) AsStringUnsafe() (string, error) {
return unsafeBytesToString(v.Bytes), nil
}
// AsStringSafe interprets the value as a string by allocating a safe copy of the underlying data.
func (v *Value) AsStringSafe() (string, error) {
return string(v.Bytes), nil
}
// AsBytesUnsafe interprets the value as a byte slice. The returned []byte is an unsafe view over
// the underlying bytes. Use AsBytesSafe() to obtain a "safe" [] that is a copy of the
// underlying data.
func (v *Value) AsBytesUnsafe() ([]byte, error) {
return v.Bytes, nil
}
// AsBytesSafe interprets the value as a byte slice by allocating a safe copy of the underlying data.
func (v *Value) AsBytesSafe() ([]byte, error) {
return append([]byte(nil), v.Bytes...), nil
}
func unsafeBytesToString(b []byte) string {
bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
var s string
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
sh.Data = bh.Data
sh.Len = bh.Len
return s
}

@ -1271,6 +1271,11 @@ github.com/prometheus/prometheus/web/api/v1
# github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
## explicit
github.com/rcrowley/go-metrics
# github.com/richardartoul/molecule v1.0.0
## explicit; go 1.13
github.com/richardartoul/molecule
github.com/richardartoul/molecule/src/codec
github.com/richardartoul/molecule/src/protowire
# github.com/rootless-containers/rootlesskit v1.1.0
## explicit; go 1.19
github.com/rootless-containers/rootlesskit/pkg/api

Loading…
Cancel
Save