Add metadata to push payload (#9694)

**What this PR does / why we need it**:
We are adding support for attaching labels to each log line. This is one
of the series of the PRs broken up to make it easier to review changes.

This PR updates the push payload to send labels with each log entry
optionally. The log labels are supposed to be in the same format as the
stream labels. Just to put it out, here is how it would look for proto
and json push payload with same data:

**proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`,
`Content-Type`: `application/x-protobuf`)**(payload built using
[push.Stream](4cd1246b88/pkg/push/types.go (L12))):
```
push.Stream{
	Entries: []logproto.Entry{
			{
				Timestamp: time.Unix(0, 1688515200000000000),
				Line:      "log line",
				Labels:    `{foo="bar"}`,
			},
	},
	Labels: `{app="test"}`,
}
```

**v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`:
`application/json`)**:
```json
{
    "streams": [{
        "stream": {
            "app": "test"
        },
        "values": [
            ["1688515200000000000", "log line", {
                "foo": "bar"
            }]
        ]
    }]
}
```
**legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**:
```json
{
    "streams": [{
        "labels": "{app=\"test\"}",
        "entries": [{
                "ts": "2023-07-05T00:00:00.000000000Z",
                "line": "log line",
                "labels": "{foo=\"bar\"}"
            }]
    }]
}
```
**Which issue(s) this PR fixes**:

**Special notes for your reviewer**:

We may need to add more thoughtful tests.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)

---------

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
pull/9902/head
Salva Corts 2 years ago committed by GitHub
parent 90fc5109dd
commit aae13c376d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      integration/client/client.go
  2. 1
      integration/loki_micro_services_delete_test.go
  3. 10
      integration/loki_micro_services_test.go
  4. 72
      pkg/loghttp/entry.go
  5. 128
      pkg/loghttp/query.go
  6. 36
      pkg/loghttp/query_test.go
  7. 1
      pkg/logproto/alias.go
  8. 201
      pkg/logproto/timeseries.go
  9. 21
      pkg/push/go.mod
  10. 40
      pkg/push/go.sum
  11. 424
      pkg/push/push.pb.go
  12. 9
      pkg/push/push.proto
  13. 264
      pkg/push/types.go
  14. 16
      pkg/push/types_test.go
  15. 4
      pkg/querier/queryrange/codec_test.go
  16. 33
      pkg/util/marshal/legacy/marshal_test.go
  17. 22
      pkg/util/marshal/marshal_test.go
  18. 33
      pkg/util/marshal/query.go
  19. 17
      pkg/util/unmarshal/legacy/unmarshal_test.go
  20. 24
      pkg/util/unmarshal/unmarshal.go
  21. 30
      pkg/util/unmarshal/unmarshal_test.go
  22. 424
      vendor/github.com/grafana/loki/pkg/push/push.pb.go
  23. 9
      vendor/github.com/grafana/loki/pkg/push/push.proto
  24. 264
      vendor/github.com/grafana/loki/pkg/push/types.go

@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
)
@ -86,13 +87,28 @@ func New(instanceID, token, baseURL string, opts ...Option) *Client {
// PushLogLine creates a new logline with the current time as timestamp
func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) error {
return c.pushLogLine(line, c.Now, extraLabels...)
return c.pushLogLine(line, c.Now, nil, extraLabels...)
}
func (c *Client) PushLogLineWithMetadata(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndMetadata(line, c.Now, logLabels, extraLabels...)
}
// PushLogLineWithTimestamp creates a new logline at the given timestamp
// The timestamp has to be a Unix timestamp (epoch seconds)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
return c.pushLogLine(line, timestamp, extraLabelList...)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabels ...map[string]string) error {
return c.pushLogLine(line, timestamp, nil, extraLabels...)
}
func (c *Client) PushLogLineWithTimestampAndMetadata(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
// If the logLabels map is empty, labels.FromMap will allocate some empty slices.
// Since this code is executed for every log line we receive, as an optimization
// to avoid those allocations we'll call labels.FromMap only if the map is not empty.
var lbls labels.Labels
if len(logLabels) > 0 {
lbls = labels.FromMap(logLabels)
}
return c.pushLogLine(line, timestamp, lbls, extraLabelList...)
}
func formatTS(ts time.Time) string {
@ -101,21 +117,22 @@ func formatTS(ts time.Time) string {
type stream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
Values [][]any `json:"values"`
}
// pushLogLine creates a new logline
func (c *Client) pushLogLine(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
func (c *Client) pushLogLine(line string, timestamp time.Time, logLabels labels.Labels, extraLabelList ...map[string]string) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL)
s := stream{
Stream: map[string]string{
"job": "varlog",
},
Values: [][]string{
Values: [][]any{
{
formatTS(timestamp),
line,
logLabels,
},
},
}

@ -12,7 +12,6 @@ import (
"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
"github.com/grafana/loki/pkg/storage"
)

@ -396,13 +396,13 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
cliQueryFrontend.Now = now
t.Run("ingest-logs", func(t *testing.T) {
// ingest logs to the previous period
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-36*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))
// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
})
t.Run("query-lookback-default", func(t *testing.T) {

@ -8,6 +8,7 @@ import (
"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"
"github.com/prometheus/prometheus/model/labels"
)
func init() {
@ -16,8 +17,9 @@ func init() {
// Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
Timestamp time.Time
Line string
NonIndexedLabels labels.Labels
}
func (e *Entry) UnmarshalJSON(data []byte) error {
@ -27,12 +29,12 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
switch i {
case 0: // timestamp
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
@ -40,12 +42,36 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // labels
if t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
var nonIndexedLabels labels.Labels
if err := jsonparser.ObjectEach(value, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: string(key),
Value: string(value),
})
return nil
}); err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
@ -67,6 +93,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
i := 0
var ts time.Time
var line string
var nonIndexedLabels labels.Labels
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
@ -81,15 +108,30 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
return false
}
return true
case 2:
iter.ReadMapCB(func(iter *jsoniter.Iterator, labelName string) bool {
labelValue := iter.ReadString()
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: labelName,
Value: labelValue,
})
return true
})
i++
if iter.Error != nil {
return false
}
return true
default:
iter.ReportError("error reading entry", "array must contains 2 values")
iter.ReportError("error reading entry", "array must have at least 2 and up to 3 values")
return false
}
})
if ok {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
Timestamp: ts,
Line: line,
NonIndexedLabels: nonIndexedLabels,
})
return true
}
@ -126,6 +168,18 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
if len(e.NonIndexedLabels) > 0 {
stream.WriteMore()
stream.WriteObjectStart()
for i, lbl := range e.NonIndexedLabels {
if i > 0 {
stream.WriteMore()
}
stream.WriteObjectField(lbl.Name)
stream.WriteString(lbl.Value)
}
stream.WriteObjectEnd()
}
stream.WriteArrayEnd()
}

@ -8,14 +8,14 @@ import (
"time"
"unsafe"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/buger/jsonparser"
json "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
)
var (
@ -57,9 +57,129 @@ func (q *QueryResponse) UnmarshalJSON(data []byte) error {
})
}
// PushRequest models a log stream push
// PushRequest models a log stream push but is unmarshalled to proto push format.
type PushRequest struct {
Streams []*Stream `json:"streams"`
Streams []LogProtoStream `json:"streams"`
}
// LogProtoStream helps with unmarshalling of each log stream for push request.
// This might look un-necessary but without it the CPU usage in benchmarks was increasing by ~25% :shrug:
type LogProtoStream logproto.Stream
func (s *LogProtoStream) UnmarshalJSON(data []byte) error {
err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error {
switch string(key) {
case "stream":
labels := make(LabelSet)
err := jsonparser.ObjectEach(val, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
labels[string(key)] = string(val)
return nil
})
if err != nil {
return err
}
s.Labels = labels.String()
case "values":
if ty == jsonparser.Null {
return nil
}
entries, err := unmarshalHTTPToLogProtoEntries(val)
if err != nil {
return err
}
s.Entries = entries
}
return nil
})
return err
}
func unmarshalHTTPToLogProtoEntries(data []byte) ([]logproto.Entry, error) {
var (
entries []logproto.Entry
parseError error
)
if _, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) {
if err != nil || parseError != nil {
return
}
if ty == jsonparser.Null {
return
}
e, err := unmarshalHTTPToLogProtoEntry(value)
if err != nil {
parseError = err
return
}
entries = append(entries, e)
}); err != nil {
parseError = err
}
if parseError != nil {
return nil, parseError
}
return entries, nil
}
func unmarshalHTTPToLogProtoEntry(data []byte) (logproto.Entry, error) {
var (
i int
parseError error
e logproto.Entry
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if (i == 0 || i == 1) && t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
} else if i == 2 && t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
switch i {
case 0: // timestamp
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // nonIndexedLabels
var nonIndexedLabels labels.Labels
err := jsonparser.ObjectEach(value, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: string(key),
Value: string(val),
})
return nil
})
if err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
if parseError != nil {
return e, parseError
}
return e, err
}
// ResultType holds the type of the result

@ -8,6 +8,7 @@ import (
"time"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
@ -153,14 +154,20 @@ func TestStreams_ToProto(t *testing.T) {
Labels: map[string]string{"foo": "bar"},
Entries: []Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
{
Labels: map[string]string{"foo": "bar", "lvl": "error"},
Entries: []Entry{
{Timestamp: time.Unix(0, 3), Line: "3"},
{Timestamp: time.Unix(0, 4), Line: "4"},
{Timestamp: time.Unix(0, 4), Line: "4", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},
@ -169,14 +176,20 @@ func TestStreams_ToProto(t *testing.T) {
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
{
Labels: `{foo="bar", lvl="error"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 3), Line: "3"},
{Timestamp: time.Unix(0, 4), Line: "4"},
{Timestamp: time.Unix(0, 4), Line: "4", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},
@ -210,7 +223,10 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
Labels: LabelSet{"foo": "bar"},
Entries: []Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},
@ -230,7 +246,10 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
Labels: LabelSet{"foo": "bar"},
Entries: []Entry{
{Timestamp: time.Unix(0, 1), Line: "log line 1"},
{Timestamp: time.Unix(0, 2), Line: "some log line 2"},
{Timestamp: time.Unix(0, 2), Line: "some log line 2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
Stream{
@ -240,7 +259,10 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 2), Line: "2", NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
}},
},
},
},

@ -10,6 +10,7 @@ import (
type Entry = push.Entry
type Stream = push.Stream
type LabelAdapter = push.LabelAdapter
type PushRequest = push.PushRequest
type PushResponse = push.PushResponse
type PusherClient = push.PusherClient

@ -2,13 +2,7 @@ package logproto
import (
"flag"
"fmt"
"io"
"strings"
"sync"
"unsafe"
"github.com/prometheus/prometheus/model/labels"
)
var (
@ -70,201 +64,6 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
return p.TimeSeries.Unmarshal(dAtA)
}
// LabelAdapter is a labels.Label that can be marshalled to/from protos.
type LabelAdapter labels.Label
// Marshal implements proto.Marshaller.
func (bs *LabelAdapter) Marshal() ([]byte, error) {
size := bs.Size()
buf := make([]byte, size)
n, err := bs.MarshalToSizedBuffer(buf[:size])
if err != nil {
return nil, err
}
return buf[:n], err
}
func (bs *LabelAdapter) MarshalTo(dAtA []byte) (int, error) {
size := bs.Size()
return bs.MarshalToSizedBuffer(dAtA[:size])
}
// MarshalTo implements proto.Marshaller.
func (bs *LabelAdapter) MarshalToSizedBuffer(buf []byte) (n int, err error) {
ls := (*labels.Label)(bs)
i := len(buf)
if len(ls.Value) > 0 {
i -= len(ls.Value)
copy(buf[i:], ls.Value)
i = encodeVarintMetrics(buf, i, uint64(len(ls.Value)))
i--
buf[i] = 0x12
}
if len(ls.Name) > 0 {
i -= len(ls.Name)
copy(buf[i:], ls.Name)
i = encodeVarintMetrics(buf, i, uint64(len(ls.Name)))
i--
buf[i] = 0xa
}
return len(buf) - i, nil
}
// Unmarshal a LabelAdapter, implements proto.Unmarshaller.
// NB this is a copy of the autogenerated code to unmarshal a LabelPair,
// with the byte copying replaced with a yoloString.
func (bs *LabelAdapter) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LabelPair: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
bs.Name = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetrics
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthMetrics
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthMetrics
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
bs.Value = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipMetrics(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthMetrics
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func yoloString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
// Size implements proto.Sizer.
func (bs *LabelAdapter) Size() (n int) {
ls := (*labels.Label)(bs)
if bs == nil {
return 0
}
var l int
_ = l
l = len(ls.Name)
if l > 0 {
n += 1 + l + sovMetrics(uint64(l))
}
l = len(ls.Value)
if l > 0 {
n += 1 + l + sovMetrics(uint64(l))
}
return n
}
// Equal implements proto.Equaler.
func (bs *LabelAdapter) Equal(other LabelAdapter) bool {
return bs.Name == other.Name && bs.Value == other.Value
}
// Compare implements proto.Comparer.
func (bs *LabelAdapter) Compare(other LabelAdapter) int {
if c := strings.Compare(bs.Name, other.Name); c != 0 {
return c
}
return strings.Compare(bs.Value, other.Value)
}
// PreallocTimeseriesSliceFromPool retrieves a slice of PreallocTimeseries from a sync.Pool.
// ReuseSlice should be called once done.
func PreallocTimeseriesSliceFromPool() []PreallocTimeseries {

@ -4,20 +4,25 @@ go 1.19
require (
github.com/gogo/protobuf v1.3.2
github.com/stretchr/testify v1.8.1
github.com/prometheus/prometheus v0.43.1-0.20230419161410-69155c6ba1e9
github.com/stretchr/testify v1.8.2
google.golang.org/grpc v1.53.0
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
github.com/prometheus/common v0.42.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/protobuf v1.29.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

@ -1,3 +1,5 @@
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@ -5,10 +7,12 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd h1:PpuIBO5P3e9hpqBD0O/HjhShYuM6XE0i/lbE6J94kww=
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
@ -19,38 +23,44 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/prometheus v0.43.1-0.20230419161410-69155c6ba1e9 h1:GrpznPCSJgx8mGGj5qfKoHiou/dVx7uMce9/9rSdiuY=
github.com/prometheus/prometheus v0.43.1-0.20230419161410-69155c6ba1e9/go.mod h1:L8xLODXgpZM57D1MA7SPgsDecKj6ez4AF7mMczR1bis=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 h1:SChBja7BCQewoTAU7IgvucQKMIXrEpFxNMs0spT3/5s=
golang.org/x/exp v0.0.0-20230307190834-24139beb5833/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@ -59,17 +69,17 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.29.1 h1:7QBf+IK2gx70Ap/hDsOmam3GE0v9HicjfEdAxE62UoM=
google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -164,15 +164,67 @@ func (m *StreamAdapter) GetHash() uint64 {
return 0
}
type LabelPairAdapter struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}
func (m *LabelPairAdapter) Reset() { *m = LabelPairAdapter{} }
func (*LabelPairAdapter) ProtoMessage() {}
func (*LabelPairAdapter) Descriptor() ([]byte, []int) {
return fileDescriptor_35ec442956852c9e, []int{3}
}
func (m *LabelPairAdapter) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *LabelPairAdapter) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_LabelPairAdapter.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *LabelPairAdapter) XXX_Merge(src proto.Message) {
xxx_messageInfo_LabelPairAdapter.Merge(m, src)
}
func (m *LabelPairAdapter) XXX_Size() int {
return m.Size()
}
func (m *LabelPairAdapter) XXX_DiscardUnknown() {
xxx_messageInfo_LabelPairAdapter.DiscardUnknown(m)
}
var xxx_messageInfo_LabelPairAdapter proto.InternalMessageInfo
func (m *LabelPairAdapter) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *LabelPairAdapter) GetValue() string {
if m != nil {
return m.Value
}
return ""
}
type EntryAdapter struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
NonIndexedLabels []LabelPairAdapter `protobuf:"bytes,3,rep,name=nonIndexedLabels,proto3" json:"nonIndexedLabels,omitempty"`
}
func (m *EntryAdapter) Reset() { *m = EntryAdapter{} }
func (*EntryAdapter) ProtoMessage() {}
func (*EntryAdapter) Descriptor() ([]byte, []int) {
return fileDescriptor_35ec442956852c9e, []int{3}
return fileDescriptor_35ec442956852c9e, []int{4}
}
func (m *EntryAdapter) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -215,44 +267,57 @@ func (m *EntryAdapter) GetLine() string {
return ""
}
func (m *EntryAdapter) GetNonIndexedLabels() []LabelPairAdapter {
if m != nil {
return m.NonIndexedLabels
}
return nil
}
func init() {
proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest")
proto.RegisterType((*PushResponse)(nil), "logproto.PushResponse")
proto.RegisterType((*StreamAdapter)(nil), "logproto.StreamAdapter")
proto.RegisterType((*LabelPairAdapter)(nil), "logproto.LabelPairAdapter")
proto.RegisterType((*EntryAdapter)(nil), "logproto.EntryAdapter")
}
func init() { proto.RegisterFile("pkg/push/push.proto", fileDescriptor_35ec442956852c9e) }
var fileDescriptor_35ec442956852c9e = []byte{
// 422 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x52, 0x41, 0x6f, 0xd3, 0x30,
0x18, 0xb5, 0xbb, 0xd2, 0x6d, 0xee, 0x18, 0x92, 0x61, 0xa3, 0x44, 0xc8, 0xae, 0x72, 0xea, 0x85,
0x44, 0x2a, 0x07, 0xce, 0x8d, 0x84, 0xb4, 0x23, 0x0a, 0x08, 0x24, 0x6e, 0x0e, 0x78, 0x4e, 0xb4,
0x24, 0x0e, 0xb1, 0x83, 0xc4, 0x8d, 0x9f, 0x30, 0xfe, 0x05, 0x3f, 0x65, 0xc7, 0x1e, 0x27, 0x0e,
0x81, 0xa6, 0x17, 0x94, 0xd3, 0x7e, 0x02, 0x8a, 0x13, 0xd3, 0xc2, 0xc5, 0x79, 0x7e, 0xfe, 0xfc,
0xbd, 0xf7, 0xbd, 0x18, 0x3d, 0x2c, 0xae, 0x84, 0x5f, 0x54, 0x2a, 0x36, 0x8b, 0x57, 0x94, 0x52,
0x4b, 0x7c, 0x94, 0x4a, 0x61, 0x90, 0xf3, 0x48, 0x48, 0x21, 0x0d, 0xf4, 0x3b, 0xd4, 0x9f, 0x3b,
0x54, 0x48, 0x29, 0x52, 0xee, 0x9b, 0x5d, 0x54, 0x5d, 0xfa, 0x3a, 0xc9, 0xb8, 0xd2, 0x2c, 0x2b,
0xfa, 0x02, 0xf7, 0x1d, 0x9a, 0xbe, 0xaa, 0x54, 0x1c, 0xf2, 0x4f, 0x15, 0x57, 0x1a, 0x5f, 0xa0,
0x43, 0xa5, 0x4b, 0xce, 0x32, 0x35, 0x83, 0xf3, 0x83, 0xc5, 0x74, 0xf9, 0xd8, 0xb3, 0x0a, 0xde,
0x6b, 0x73, 0xb0, 0xfa, 0xc8, 0x0a, 0xcd, 0xcb, 0xe0, 0xec, 0x47, 0x4d, 0x27, 0x3d, 0xd5, 0xd6,
0xd4, 0xde, 0x0a, 0x2d, 0x70, 0x4f, 0xd1, 0x49, 0xdf, 0x58, 0x15, 0x32, 0x57, 0xdc, 0xfd, 0x06,
0xd1, 0xfd, 0x7f, 0x3a, 0x60, 0x17, 0x4d, 0x52, 0x16, 0xf1, 0xb4, 0x93, 0x82, 0x8b, 0xe3, 0x00,
0xb5, 0x35, 0x1d, 0x98, 0x70, 0xf8, 0xe2, 0x15, 0x3a, 0xe4, 0xb9, 0x2e, 0x13, 0xae, 0x66, 0x23,
0xe3, 0xe7, 0x7c, 0xe7, 0xe7, 0x65, 0xae, 0xcb, 0x2f, 0xd6, 0xce, 0x83, 0x9b, 0x9a, 0x82, 0xce,
0xc8, 0x50, 0x1e, 0x5a, 0x80, 0x9f, 0xa0, 0x71, 0xcc, 0x54, 0x3c, 0x3b, 0x98, 0xc3, 0xc5, 0x38,
0xb8, 0xd7, 0xd6, 0x14, 0x3e, 0x0b, 0x0d, 0xe5, 0x7e, 0x46, 0x27, 0xfb, 0x4d, 0xf0, 0x05, 0x3a,
0xfe, 0x9b, 0x8f, 0x31, 0x35, 0x5d, 0x3a, 0x5e, 0x9f, 0xa0, 0x67, 0x13, 0xf4, 0xde, 0xd8, 0x8a,
0xe0, 0x74, 0xd0, 0x1c, 0x69, 0x75, 0xfd, 0x93, 0xc2, 0x70, 0x77, 0x19, 0x3f, 0x45, 0xe3, 0x34,
0xc9, 0xf9, 0x6c, 0x64, 0x26, 0x3b, 0x6a, 0x6b, 0x6a, 0xf6, 0xa1, 0x59, 0x97, 0x2b, 0x34, 0xe9,
0xb2, 0xe1, 0x25, 0x7e, 0x81, 0xc6, 0x1d, 0xc2, 0x67, 0xbb, 0xb1, 0xf6, 0x7e, 0x87, 0x73, 0xfe,
0x3f, 0x3d, 0x84, 0x09, 0x82, 0xb7, 0xeb, 0x0d, 0x01, 0xb7, 0x1b, 0x02, 0xee, 0x36, 0x04, 0x7e,
0x6d, 0x08, 0xfc, 0xde, 0x10, 0x78, 0xd3, 0x10, 0xb8, 0x6e, 0x08, 0xfc, 0xd5, 0x10, 0xf8, 0xbb,
0x21, 0xe0, 0xae, 0x21, 0xf0, 0x7a, 0x4b, 0xc0, 0x7a, 0x4b, 0xc0, 0xed, 0x96, 0x80, 0xf7, 0x73,
0x91, 0xe8, 0xb8, 0x8a, 0xbc, 0x0f, 0x32, 0xf3, 0x45, 0xc9, 0x2e, 0x59, 0xce, 0xfc, 0x54, 0x5e,
0x25, 0xbe, 0x7d, 0x5b, 0xd1, 0xc4, 0xa8, 0x3d, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0x5c, 0x30,
0xfc, 0xe9, 0x6e, 0x02, 0x00, 0x00,
// 498 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x53, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0x6e, 0xda, 0x4e, 0x4a, 0xa9, 0x96, 0xb6, 0x04, 0x0b, 0xad, 0x23, 0x9f, 0x72,
0x00, 0x5b, 0x0a, 0x07, 0x2e, 0x5c, 0x62, 0x09, 0xa9, 0x48, 0x3d, 0x54, 0x06, 0x81, 0xc4, 0x6d,
0x43, 0xb6, 0xb6, 0xa9, 0xed, 0x35, 0xde, 0x35, 0xa2, 0x37, 0x3e, 0xa1, 0xfc, 0x05, 0x9f, 0xd2,
0x63, 0x8e, 0x15, 0x07, 0x43, 0x9c, 0x0b, 0xca, 0xa9, 0x9f, 0x80, 0xbc, 0xb6, 0x49, 0x09, 0x97,
0xf5, 0x9b, 0xb7, 0x33, 0xf3, 0x9e, 0x67, 0x6c, 0x78, 0x90, 0x5e, 0xf8, 0x4e, 0x9a, 0x8b, 0x40,
0x1d, 0x76, 0x9a, 0x71, 0xc9, 0xf1, 0x4e, 0xc4, 0x7d, 0x85, 0x8c, 0x43, 0x9f, 0xfb, 0x5c, 0x41,
0xa7, 0x42, 0xf5, 0xbd, 0x61, 0xfa, 0x9c, 0xfb, 0x11, 0x73, 0x54, 0x34, 0xcd, 0xcf, 0x1d, 0x19,
0xc6, 0x4c, 0x48, 0x1a, 0xa7, 0x75, 0x82, 0xf5, 0x0e, 0xfa, 0x67, 0xb9, 0x08, 0x3c, 0xf6, 0x29,
0x67, 0x42, 0xe2, 0x13, 0xd8, 0x16, 0x32, 0x63, 0x34, 0x16, 0x03, 0x34, 0xec, 0x8e, 0xfa, 0xe3,
0x87, 0x76, 0xab, 0x60, 0xbf, 0x56, 0x17, 0x93, 0x19, 0x4d, 0x25, 0xcb, 0xdc, 0xa3, 0x1f, 0x85,
0xd9, 0xab, 0xa9, 0x55, 0x61, 0xb6, 0x55, 0x5e, 0x0b, 0xac, 0x7d, 0xd8, 0xab, 0x1b, 0x8b, 0x94,
0x27, 0x82, 0x59, 0xdf, 0x10, 0xdc, 0xfb, 0xa7, 0x03, 0xb6, 0xa0, 0x17, 0xd1, 0x29, 0x8b, 0x2a,
0x29, 0x34, 0xda, 0x75, 0x61, 0x55, 0x98, 0x0d, 0xe3, 0x35, 0x4f, 0x3c, 0x81, 0x6d, 0x96, 0xc8,
0x2c, 0x64, 0x62, 0xd0, 0x51, 0x7e, 0x8e, 0xd7, 0x7e, 0x5e, 0x26, 0x32, 0xbb, 0x6c, 0xed, 0xdc,
0xbf, 0x2e, 0x4c, 0xad, 0x32, 0xd2, 0xa4, 0x7b, 0x2d, 0xc0, 0x8f, 0x40, 0x0f, 0xa8, 0x08, 0x06,
0xdd, 0x21, 0x1a, 0xe9, 0xee, 0xd6, 0xaa, 0x30, 0xd1, 0x53, 0x4f, 0x51, 0xd6, 0x0b, 0x38, 0x38,
0xad, 0x74, 0xce, 0x68, 0x98, 0xb5, 0xae, 0x30, 0xe8, 0x09, 0x8d, 0x59, 0xed, 0xc9, 0x53, 0x18,
0x1f, 0xc2, 0xd6, 0x67, 0x1a, 0xe5, 0x6c, 0xd0, 0x51, 0x64, 0x1d, 0x58, 0x25, 0x82, 0xbd, 0xbb,
0x1e, 0xf0, 0x09, 0xec, 0xfe, 0x1d, 0xaf, 0xaa, 0xef, 0x8f, 0x0d, 0xbb, 0x5e, 0x80, 0xdd, 0x2e,
0xc0, 0x7e, 0xd3, 0x66, 0xb8, 0xfb, 0x8d, 0xe5, 0x8e, 0x14, 0x57, 0x3f, 0x4d, 0xe4, 0xad, 0x8b,
0xf1, 0x63, 0xd0, 0xa3, 0x30, 0x69, 0xf4, 0xdc, 0x9d, 0x55, 0x61, 0xaa, 0xd8, 0x53, 0x27, 0xfe,
0x08, 0x07, 0x09, 0x4f, 0x5e, 0x25, 0x33, 0xf6, 0x85, 0xcd, 0x4e, 0xeb, 0x11, 0x76, 0xd5, 0x74,
0x8c, 0xf5, 0x74, 0x36, 0x5f, 0xcc, 0xb5, 0x1a, 0x39, 0x63, 0xb3, 0xf6, 0x09, 0x8f, 0x43, 0xc9,
0xe2, 0x54, 0x5e, 0x7a, 0xff, 0xf5, 0x1d, 0x4f, 0xa0, 0x57, 0xad, 0x91, 0x65, 0xf8, 0x39, 0xe8,
0x15, 0xc2, 0x47, 0x6b, 0x8d, 0x3b, 0x5f, 0x8e, 0x71, 0xbc, 0x49, 0x37, 0x7b, 0xd7, 0xdc, 0xb7,
0xf3, 0x05, 0xd1, 0x6e, 0x16, 0x44, 0xbb, 0x5d, 0x10, 0xf4, 0xb5, 0x24, 0xe8, 0x7b, 0x49, 0xd0,
0x75, 0x49, 0xd0, 0xbc, 0x24, 0xe8, 0x57, 0x49, 0xd0, 0xef, 0x92, 0x68, 0xb7, 0x25, 0x41, 0x57,
0x4b, 0xa2, 0xcd, 0x97, 0x44, 0xbb, 0x59, 0x12, 0xed, 0xfd, 0xd0, 0x0f, 0x65, 0x90, 0x4f, 0xed,
0x0f, 0x3c, 0x76, 0xfc, 0x8c, 0x9e, 0xd3, 0x84, 0x3a, 0x11, 0xbf, 0x08, 0x9d, 0xf6, 0x37, 0x98,
0xf6, 0x94, 0xda, 0xb3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x9b, 0x6f, 0x19, 0xc8, 0x19, 0x03,
0x00, 0x00,
}
func (this *PushRequest) Equal(that interface{}) bool {
@ -340,6 +405,33 @@ func (this *StreamAdapter) Equal(that interface{}) bool {
}
return true
}
func (this *LabelPairAdapter) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*LabelPairAdapter)
if !ok {
that2, ok := that.(LabelPairAdapter)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Name != that1.Name {
return false
}
if this.Value != that1.Value {
return false
}
return true
}
func (this *EntryAdapter) Equal(that interface{}) bool {
if that == nil {
return this == nil
@ -365,6 +457,14 @@ func (this *EntryAdapter) Equal(that interface{}) bool {
if this.Line != that1.Line {
return false
}
if len(this.NonIndexedLabels) != len(that1.NonIndexedLabels) {
return false
}
for i := range this.NonIndexedLabels {
if !this.NonIndexedLabels[i].Equal(&that1.NonIndexedLabels[i]) {
return false
}
}
return true
}
func (this *PushRequest) GoString() string {
@ -404,14 +504,32 @@ func (this *StreamAdapter) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
func (this *EntryAdapter) GoString() string {
func (this *LabelPairAdapter) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s = append(s, "&push.LabelPairAdapter{")
s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func (this *EntryAdapter) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s = append(s, "&push.EntryAdapter{")
s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n")
s = append(s, "Line: "+fmt.Sprintf("%#v", this.Line)+",\n")
if this.NonIndexedLabels != nil {
vs := make([]*LabelPairAdapter, len(this.NonIndexedLabels))
for i := range vs {
vs[i] = &this.NonIndexedLabels[i]
}
s = append(s, "NonIndexedLabels: "+fmt.Sprintf("%#v", vs)+",\n")
}
s = append(s, "}")
return strings.Join(s, "")
}
@ -613,6 +731,43 @@ func (m *StreamAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *LabelPairAdapter) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LabelPairAdapter) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LabelPairAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintPush(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintPush(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *EntryAdapter) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -633,6 +788,20 @@ func (m *EntryAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.NonIndexedLabels) > 0 {
for iNdEx := len(m.NonIndexedLabels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.NonIndexedLabels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Line) > 0 {
i -= len(m.Line)
copy(dAtA[i:], m.Line)
@ -708,6 +877,23 @@ func (m *StreamAdapter) Size() (n int) {
return n
}
func (m *LabelPairAdapter) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
return n
}
func (m *EntryAdapter) Size() (n int) {
if m == nil {
return 0
@ -720,6 +906,12 @@ func (m *EntryAdapter) Size() (n int) {
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
if len(m.NonIndexedLabels) > 0 {
for _, e := range m.NonIndexedLabels {
l = e.Size()
n += 1 + l + sovPush(uint64(l))
}
}
return n
}
@ -765,13 +957,30 @@ func (this *StreamAdapter) String() string {
}, "")
return s
}
func (this *LabelPairAdapter) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&LabelPairAdapter{`,
`Name:` + fmt.Sprintf("%v", this.Name) + `,`,
`Value:` + fmt.Sprintf("%v", this.Value) + `,`,
`}`,
}, "")
return s
}
func (this *EntryAdapter) String() string {
if this == nil {
return "nil"
}
repeatedStringForNonIndexedLabels := "[]LabelPairAdapter{"
for _, f := range this.NonIndexedLabels {
repeatedStringForNonIndexedLabels += strings.Replace(strings.Replace(f.String(), "LabelPairAdapter", "LabelPairAdapter", 1), `&`, ``, 1) + ","
}
repeatedStringForNonIndexedLabels += "}"
s := strings.Join([]string{`&EntryAdapter{`,
`Timestamp:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timestamp), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Line:` + fmt.Sprintf("%v", this.Line) + `,`,
`NonIndexedLabels:` + repeatedStringForNonIndexedLabels + `,`,
`}`,
}, "")
return s
@ -1062,6 +1271,123 @@ func (m *StreamAdapter) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *LabelPairAdapter) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LabelPairAdapter: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelPairAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *EntryAdapter) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -1156,6 +1482,40 @@ func (m *EntryAdapter) Unmarshal(dAtA []byte) error {
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field NonIndexedLabels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.NonIndexedLabels = append(m.NonIndexedLabels, LabelPairAdapter{})
if err := m.NonIndexedLabels[len(m.NonIndexedLabels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])

@ -30,6 +30,11 @@ message StreamAdapter {
uint64 hash = 3 [(gogoproto.jsontag) = "-"];
}
message LabelPairAdapter {
string name = 1;
string value = 2;
}
message EntryAdapter {
google.protobuf.Timestamp timestamp = 1 [
(gogoproto.stdtime) = true,
@ -37,4 +42,8 @@ message EntryAdapter {
(gogoproto.jsontag) = "ts"
];
string line = 2 [(gogoproto.jsontag) = "line"];
repeated LabelPairAdapter nonIndexedLabels = 3 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "nonIndexedLabels,omitempty"
];
}

@ -3,7 +3,11 @@ package push
import (
"fmt"
"io"
"strings"
"time"
"unsafe"
"github.com/prometheus/prometheus/model/labels"
)
// Stream contains a unique labels set as a string and a set of entries for it.
@ -17,8 +21,48 @@ type Stream struct {
// Entry is a log entry with a timestamp.
type Entry struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
NonIndexedLabels labels.Labels `protobuf:"bytes,3,opt,name=nonIndexedLabels,proto3" json:"nonIndexedLabels,omitempty"`
}
type LabelAdapter labels.Label
func (m *LabelAdapter) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LabelAdapter) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LabelAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintPush(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintPush(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *Stream) Marshal() (dAtA []byte, err error) {
@ -90,6 +134,20 @@ func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.NonIndexedLabels) > 0 {
for iNdEx := len(m.NonIndexedLabels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := (*LabelAdapter)(&m.NonIndexedLabels[iNdEx]).MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Line) > 0 {
i -= len(m.Line)
copy(dAtA[i:], m.Line)
@ -139,7 +197,7 @@ func (m *Stream) Unmarshal(dAtA []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field NonIndexedLabels", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
@ -341,6 +399,40 @@ func (m *Entry) Unmarshal(dAtA []byte) error {
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field NonIndexedLabels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.NonIndexedLabels = append(m.NonIndexedLabels, labels.Label{})
if err := (*LabelAdapter)(&m.NonIndexedLabels[len(m.NonIndexedLabels)-1]).Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
@ -366,6 +458,131 @@ func (m *Entry) Unmarshal(dAtA []byte) error {
return nil
}
// Unmarshal a LabelAdapter, implements proto.Unmarshaller.
// NB this is a copy of the autogenerated code to unmarshal a LabelPair,
// with the byte copying replaced with a yoloString.
func (m *LabelAdapter) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LabelPairAdapter: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelPairAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func yoloString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
func (m *Stream) Size() (n int) {
if m == nil {
return 0
@ -400,6 +617,29 @@ func (m *Entry) Size() (n int) {
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
if len(m.NonIndexedLabels) > 0 {
for _, e := range m.NonIndexedLabels {
l = (*LabelAdapter)(&e).Size()
n += 1 + l + sovPush(uint64(l))
}
}
return n
}
func (m *LabelAdapter) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
return n
}
@ -461,5 +701,23 @@ func (m *Entry) Equal(that interface{}) bool {
if m.Line != that1.Line {
return false
}
for i := range m.NonIndexedLabels {
if !(*LabelAdapter)(&m.NonIndexedLabels[i]).Equal(LabelAdapter(that1.NonIndexedLabels[i])) {
return false
}
}
return true
}
// Equal implements proto.Equaler.
func (m *LabelAdapter) Equal(other LabelAdapter) bool {
return m.Name == other.Name && m.Value == other.Value
}
// Compare implements proto.Comparer.
func (m *LabelAdapter) Compare(other LabelAdapter) int {
if c := strings.Compare(m.Name, other.Name); c != 0 {
return c
}
return strings.Compare(m.Value, other.Value)
}

@ -14,20 +14,20 @@ var (
Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`,
Hash: 1234*10 ^ 9,
Entries: []Entry{
{now, line},
{now.Add(1 * time.Second), line},
{now.Add(2 * time.Second), line},
{now.Add(3 * time.Second), line},
{now, line, ""},
{now.Add(1 * time.Second), line, `{traceID="1234"}`},
{now.Add(2 * time.Second), line, ""},
{now.Add(3 * time.Second), line, `{user="abc"}`},
},
}
streamAdapter = StreamAdapter{
Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`,
Hash: 1234*10 ^ 9,
Entries: []EntryAdapter{
{now, line},
{now.Add(1 * time.Second), line},
{now.Add(2 * time.Second), line},
{now.Add(3 * time.Second), line},
{now, line, ""},
{now.Add(1 * time.Second), line, `{traceID="1234"}`},
{now.Add(2 * time.Second), line, ""},
{now.Add(3 * time.Second), line, `{user="abc"}`},
},
}
)

@ -1479,7 +1479,7 @@ var (
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
[ "123456789012345", "super line"]
]
},
{
@ -1487,7 +1487,7 @@ var (
"test": "test2"
},
"values":[
[ "123456789012346", "super line2" ]
[ "123456789012346", "super line2"]
]
}
]

@ -7,6 +7,7 @@ import (
"time"
json "github.com/json-iterator/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
loghttp "github.com/grafana/loki/pkg/loghttp/legacy"
@ -27,6 +28,14 @@ var queryTests = []struct {
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:22.380001319Z"),
Line: "super line",
},
{
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:23.380001319Z"),
Line: "super line with labels",
NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
},
},
},
Labels: `{test="test"}`,
},
@ -39,6 +48,14 @@ var queryTests = []struct {
{
"ts": "2019-09-13T18:32:22.380001319Z",
"line": "super line"
},
{
"ts": "2019-09-13T18:32:23.380001319Z",
"line": "super line with labels",
"nonIndexedLabels": {
"foo": "a",
"bar": "b"
}
}
]
}
@ -164,6 +181,14 @@ var tailTests = []struct {
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:22.380001319Z"),
Line: "super line",
},
{
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:23.380001319Z"),
Line: "super line with labels",
NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
},
},
},
Labels: "{test=\"test\"}",
},
@ -183,6 +208,14 @@ var tailTests = []struct {
{
"ts": "2019-09-13T18:32:22.380001319Z",
"line": "super line"
},
{
"ts": "2019-09-13T18:32:23.380001319Z",
"line": "super line with labels",
"nonIndexedLabels": {
"foo": "a",
"bar": "b"
}
}
]
}

@ -34,6 +34,14 @@ var queryTests = []struct {
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
{
Timestamp: time.Unix(0, 123456789012346),
Line: "super line with labels",
NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
},
},
},
Labels: `{test="test"}`,
},
@ -48,7 +56,8 @@ var queryTests = []struct {
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
[ "123456789012345", "super line"],
[ "123456789012346", "super line with labels", { "foo": "a", "bar": "b" } ]
]
}
],
@ -488,6 +497,14 @@ var tailTests = []struct {
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
{
Timestamp: time.Unix(0, 123456789012346),
Line: "super line with labels",
NonIndexedLabels: labels.Labels{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
},
},
},
Labels: "{test=\"test\"}",
},
@ -506,7 +523,8 @@ var tailTests = []struct {
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
[ "123456789012345", "super line"],
[ "123456789012346", "super line with labels", { "foo": "a", "bar": "b" } ]
]
}
],

@ -3,6 +3,7 @@ package marshal
import (
"fmt"
"strconv"
"unsafe"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
@ -90,26 +91,20 @@ func NewStream(s logproto.Stream) (loghttp.Stream, error) {
return loghttp.Stream{}, errors.Wrapf(err, "err while creating labelset for %s", s.Labels)
}
ret := loghttp.Stream{
Labels: labels,
Entries: make([]loghttp.Entry, len(s.Entries)),
// Avoid a nil entries slice to be consistent with the decoding
entries := []loghttp.Entry{}
if len(s.Entries) > 0 {
entries = *(*[]loghttp.Entry)(unsafe.Pointer(&s.Entries))
}
for i, e := range s.Entries {
ret.Entries[i] = NewEntry(e)
ret := loghttp.Stream{
Labels: labels,
Entries: entries,
}
return ret, nil
}
// NewEntry constructs an Entry from a logproto.Entry
func NewEntry(e logproto.Entry) loghttp.Entry {
return loghttp.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
}
}
func NewScalar(s promql.Scalar) loghttp.Scalar {
return loghttp.Scalar{
Timestamp: model.Time(s.T),
@ -315,6 +310,18 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream) error {
s.WriteRaw(`"`)
s.WriteMore()
s.WriteStringWithHTMLEscaped(e.Line)
if len(e.NonIndexedLabels) > 0 {
s.WriteMore()
s.WriteObjectStart()
for i, lbl := range e.NonIndexedLabels {
if i > 0 {
s.WriteMore()
}
s.WriteObjectField(lbl.Name)
s.WriteString(lbl.Value)
}
s.WriteObjectEnd()
}
s.WriteArrayEnd()
s.Flush()

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
@ -25,6 +26,14 @@ var pushTests = []struct {
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:22.380001319Z"),
Line: "super line",
},
{
Timestamp: mustParse(time.RFC3339Nano, "2019-09-13T18:32:23.380001319Z"),
Line: "super line with labels",
NonIndexedLabels: labels.Labels{
{Name: "a", Value: "1"},
{Name: "b", Value: "2"},
},
},
},
Labels: `{test="test"}`,
},
@ -37,6 +46,14 @@ var pushTests = []struct {
{
"ts": "2019-09-13T18:32:22.380001319Z",
"line": "super line"
},
{
"ts": "2019-09-13T18:32:23.380001319Z",
"line": "super line with labels",
"nonIndexedLabels": {
"a": "1",
"b": "2"
}
}
]
}

@ -17,30 +17,12 @@ func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
if err := jsoniter.NewDecoder(b).Decode(&request); err != nil {
return err
}
*r = NewPushRequest(request)
return nil
}
// NewPushRequest constructs a logproto.PushRequest from a PushRequest
func NewPushRequest(r loghttp.PushRequest) logproto.PushRequest {
ret := logproto.PushRequest{
Streams: make([]logproto.Stream, len(r.Streams)),
}
for i, s := range r.Streams {
ret.Streams[i] = NewStream(s)
*r = logproto.PushRequest{
Streams: *(*[]logproto.Stream)(unsafe.Pointer(&request.Streams)),
}
return ret
}
// NewStream constructs a logproto.Stream from a Stream
func NewStream(s *loghttp.Stream) logproto.Stream {
return logproto.Stream{
Entries: *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries)),
Labels: s.Labels.String(),
}
return nil
}
// WebsocketReader knows how to read message to a websocket connection.

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/loghttp"
@ -45,6 +46,35 @@ var pushTests = []struct {
]
}`,
},
{
[]logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
NonIndexedLabels: labels.Labels{
{Name: "a", Value: "1"},
{Name: "b", Value: "2"},
},
},
},
Labels: `{test="test"}`,
},
},
`{
"streams": [
{
"stream": {
"test": "test"
},
"values":[
[ "123456789012345", "super line", { "a": "1", "b": "2" } ]
]
}
]
}`,
},
}
func Test_DecodePushRequest(t *testing.T) {

@ -164,15 +164,67 @@ func (m *StreamAdapter) GetHash() uint64 {
return 0
}
type LabelPairAdapter struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}
func (m *LabelPairAdapter) Reset() { *m = LabelPairAdapter{} }
func (*LabelPairAdapter) ProtoMessage() {}
func (*LabelPairAdapter) Descriptor() ([]byte, []int) {
return fileDescriptor_35ec442956852c9e, []int{3}
}
func (m *LabelPairAdapter) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *LabelPairAdapter) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_LabelPairAdapter.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *LabelPairAdapter) XXX_Merge(src proto.Message) {
xxx_messageInfo_LabelPairAdapter.Merge(m, src)
}
func (m *LabelPairAdapter) XXX_Size() int {
return m.Size()
}
func (m *LabelPairAdapter) XXX_DiscardUnknown() {
xxx_messageInfo_LabelPairAdapter.DiscardUnknown(m)
}
var xxx_messageInfo_LabelPairAdapter proto.InternalMessageInfo
func (m *LabelPairAdapter) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *LabelPairAdapter) GetValue() string {
if m != nil {
return m.Value
}
return ""
}
type EntryAdapter struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
NonIndexedLabels []LabelPairAdapter `protobuf:"bytes,3,rep,name=nonIndexedLabels,proto3" json:"nonIndexedLabels,omitempty"`
}
func (m *EntryAdapter) Reset() { *m = EntryAdapter{} }
func (*EntryAdapter) ProtoMessage() {}
func (*EntryAdapter) Descriptor() ([]byte, []int) {
return fileDescriptor_35ec442956852c9e, []int{3}
return fileDescriptor_35ec442956852c9e, []int{4}
}
func (m *EntryAdapter) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -215,44 +267,57 @@ func (m *EntryAdapter) GetLine() string {
return ""
}
func (m *EntryAdapter) GetNonIndexedLabels() []LabelPairAdapter {
if m != nil {
return m.NonIndexedLabels
}
return nil
}
func init() {
proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest")
proto.RegisterType((*PushResponse)(nil), "logproto.PushResponse")
proto.RegisterType((*StreamAdapter)(nil), "logproto.StreamAdapter")
proto.RegisterType((*LabelPairAdapter)(nil), "logproto.LabelPairAdapter")
proto.RegisterType((*EntryAdapter)(nil), "logproto.EntryAdapter")
}
func init() { proto.RegisterFile("pkg/push/push.proto", fileDescriptor_35ec442956852c9e) }
var fileDescriptor_35ec442956852c9e = []byte{
// 422 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x52, 0x41, 0x6f, 0xd3, 0x30,
0x18, 0xb5, 0xbb, 0xd2, 0x6d, 0xee, 0x18, 0x92, 0x61, 0xa3, 0x44, 0xc8, 0xae, 0x72, 0xea, 0x85,
0x44, 0x2a, 0x07, 0xce, 0x8d, 0x84, 0xb4, 0x23, 0x0a, 0x08, 0x24, 0x6e, 0x0e, 0x78, 0x4e, 0xb4,
0x24, 0x0e, 0xb1, 0x83, 0xc4, 0x8d, 0x9f, 0x30, 0xfe, 0x05, 0x3f, 0x65, 0xc7, 0x1e, 0x27, 0x0e,
0x81, 0xa6, 0x17, 0x94, 0xd3, 0x7e, 0x02, 0x8a, 0x13, 0xd3, 0xc2, 0xc5, 0x79, 0x7e, 0xfe, 0xfc,
0xbd, 0xf7, 0xbd, 0x18, 0x3d, 0x2c, 0xae, 0x84, 0x5f, 0x54, 0x2a, 0x36, 0x8b, 0x57, 0x94, 0x52,
0x4b, 0x7c, 0x94, 0x4a, 0x61, 0x90, 0xf3, 0x48, 0x48, 0x21, 0x0d, 0xf4, 0x3b, 0xd4, 0x9f, 0x3b,
0x54, 0x48, 0x29, 0x52, 0xee, 0x9b, 0x5d, 0x54, 0x5d, 0xfa, 0x3a, 0xc9, 0xb8, 0xd2, 0x2c, 0x2b,
0xfa, 0x02, 0xf7, 0x1d, 0x9a, 0xbe, 0xaa, 0x54, 0x1c, 0xf2, 0x4f, 0x15, 0x57, 0x1a, 0x5f, 0xa0,
0x43, 0xa5, 0x4b, 0xce, 0x32, 0x35, 0x83, 0xf3, 0x83, 0xc5, 0x74, 0xf9, 0xd8, 0xb3, 0x0a, 0xde,
0x6b, 0x73, 0xb0, 0xfa, 0xc8, 0x0a, 0xcd, 0xcb, 0xe0, 0xec, 0x47, 0x4d, 0x27, 0x3d, 0xd5, 0xd6,
0xd4, 0xde, 0x0a, 0x2d, 0x70, 0x4f, 0xd1, 0x49, 0xdf, 0x58, 0x15, 0x32, 0x57, 0xdc, 0xfd, 0x06,
0xd1, 0xfd, 0x7f, 0x3a, 0x60, 0x17, 0x4d, 0x52, 0x16, 0xf1, 0xb4, 0x93, 0x82, 0x8b, 0xe3, 0x00,
0xb5, 0x35, 0x1d, 0x98, 0x70, 0xf8, 0xe2, 0x15, 0x3a, 0xe4, 0xb9, 0x2e, 0x13, 0xae, 0x66, 0x23,
0xe3, 0xe7, 0x7c, 0xe7, 0xe7, 0x65, 0xae, 0xcb, 0x2f, 0xd6, 0xce, 0x83, 0x9b, 0x9a, 0x82, 0xce,
0xc8, 0x50, 0x1e, 0x5a, 0x80, 0x9f, 0xa0, 0x71, 0xcc, 0x54, 0x3c, 0x3b, 0x98, 0xc3, 0xc5, 0x38,
0xb8, 0xd7, 0xd6, 0x14, 0x3e, 0x0b, 0x0d, 0xe5, 0x7e, 0x46, 0x27, 0xfb, 0x4d, 0xf0, 0x05, 0x3a,
0xfe, 0x9b, 0x8f, 0x31, 0x35, 0x5d, 0x3a, 0x5e, 0x9f, 0xa0, 0x67, 0x13, 0xf4, 0xde, 0xd8, 0x8a,
0xe0, 0x74, 0xd0, 0x1c, 0x69, 0x75, 0xfd, 0x93, 0xc2, 0x70, 0x77, 0x19, 0x3f, 0x45, 0xe3, 0x34,
0xc9, 0xf9, 0x6c, 0x64, 0x26, 0x3b, 0x6a, 0x6b, 0x6a, 0xf6, 0xa1, 0x59, 0x97, 0x2b, 0x34, 0xe9,
0xb2, 0xe1, 0x25, 0x7e, 0x81, 0xc6, 0x1d, 0xc2, 0x67, 0xbb, 0xb1, 0xf6, 0x7e, 0x87, 0x73, 0xfe,
0x3f, 0x3d, 0x84, 0x09, 0x82, 0xb7, 0xeb, 0x0d, 0x01, 0xb7, 0x1b, 0x02, 0xee, 0x36, 0x04, 0x7e,
0x6d, 0x08, 0xfc, 0xde, 0x10, 0x78, 0xd3, 0x10, 0xb8, 0x6e, 0x08, 0xfc, 0xd5, 0x10, 0xf8, 0xbb,
0x21, 0xe0, 0xae, 0x21, 0xf0, 0x7a, 0x4b, 0xc0, 0x7a, 0x4b, 0xc0, 0xed, 0x96, 0x80, 0xf7, 0x73,
0x91, 0xe8, 0xb8, 0x8a, 0xbc, 0x0f, 0x32, 0xf3, 0x45, 0xc9, 0x2e, 0x59, 0xce, 0xfc, 0x54, 0x5e,
0x25, 0xbe, 0x7d, 0x5b, 0xd1, 0xc4, 0xa8, 0x3d, 0xff, 0x13, 0x00, 0x00, 0xff, 0xff, 0x5c, 0x30,
0xfc, 0xe9, 0x6e, 0x02, 0x00, 0x00,
// 498 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x53, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0x6e, 0xda, 0x4e, 0x4a, 0xa9, 0x96, 0xb6, 0x04, 0x0b, 0xad, 0x23, 0x9f, 0x72,
0x00, 0x5b, 0x0a, 0x07, 0x2e, 0x5c, 0x62, 0x09, 0xa9, 0x48, 0x3d, 0x54, 0x06, 0x81, 0xc4, 0x6d,
0x43, 0xb6, 0xb6, 0xa9, 0xed, 0x35, 0xde, 0x35, 0xa2, 0x37, 0x3e, 0xa1, 0xfc, 0x05, 0x9f, 0xd2,
0x63, 0x8e, 0x15, 0x07, 0x43, 0x9c, 0x0b, 0xca, 0xa9, 0x9f, 0x80, 0xbc, 0xb6, 0x49, 0x09, 0x97,
0xf5, 0x9b, 0xb7, 0x33, 0xf3, 0x9e, 0x67, 0x6c, 0x78, 0x90, 0x5e, 0xf8, 0x4e, 0x9a, 0x8b, 0x40,
0x1d, 0x76, 0x9a, 0x71, 0xc9, 0xf1, 0x4e, 0xc4, 0x7d, 0x85, 0x8c, 0x43, 0x9f, 0xfb, 0x5c, 0x41,
0xa7, 0x42, 0xf5, 0xbd, 0x61, 0xfa, 0x9c, 0xfb, 0x11, 0x73, 0x54, 0x34, 0xcd, 0xcf, 0x1d, 0x19,
0xc6, 0x4c, 0x48, 0x1a, 0xa7, 0x75, 0x82, 0xf5, 0x0e, 0xfa, 0x67, 0xb9, 0x08, 0x3c, 0xf6, 0x29,
0x67, 0x42, 0xe2, 0x13, 0xd8, 0x16, 0x32, 0x63, 0x34, 0x16, 0x03, 0x34, 0xec, 0x8e, 0xfa, 0xe3,
0x87, 0x76, 0xab, 0x60, 0xbf, 0x56, 0x17, 0x93, 0x19, 0x4d, 0x25, 0xcb, 0xdc, 0xa3, 0x1f, 0x85,
0xd9, 0xab, 0xa9, 0x55, 0x61, 0xb6, 0x55, 0x5e, 0x0b, 0xac, 0x7d, 0xd8, 0xab, 0x1b, 0x8b, 0x94,
0x27, 0x82, 0x59, 0xdf, 0x10, 0xdc, 0xfb, 0xa7, 0x03, 0xb6, 0xa0, 0x17, 0xd1, 0x29, 0x8b, 0x2a,
0x29, 0x34, 0xda, 0x75, 0x61, 0x55, 0x98, 0x0d, 0xe3, 0x35, 0x4f, 0x3c, 0x81, 0x6d, 0x96, 0xc8,
0x2c, 0x64, 0x62, 0xd0, 0x51, 0x7e, 0x8e, 0xd7, 0x7e, 0x5e, 0x26, 0x32, 0xbb, 0x6c, 0xed, 0xdc,
0xbf, 0x2e, 0x4c, 0xad, 0x32, 0xd2, 0xa4, 0x7b, 0x2d, 0xc0, 0x8f, 0x40, 0x0f, 0xa8, 0x08, 0x06,
0xdd, 0x21, 0x1a, 0xe9, 0xee, 0xd6, 0xaa, 0x30, 0xd1, 0x53, 0x4f, 0x51, 0xd6, 0x0b, 0x38, 0x38,
0xad, 0x74, 0xce, 0x68, 0x98, 0xb5, 0xae, 0x30, 0xe8, 0x09, 0x8d, 0x59, 0xed, 0xc9, 0x53, 0x18,
0x1f, 0xc2, 0xd6, 0x67, 0x1a, 0xe5, 0x6c, 0xd0, 0x51, 0x64, 0x1d, 0x58, 0x25, 0x82, 0xbd, 0xbb,
0x1e, 0xf0, 0x09, 0xec, 0xfe, 0x1d, 0xaf, 0xaa, 0xef, 0x8f, 0x0d, 0xbb, 0x5e, 0x80, 0xdd, 0x2e,
0xc0, 0x7e, 0xd3, 0x66, 0xb8, 0xfb, 0x8d, 0xe5, 0x8e, 0x14, 0x57, 0x3f, 0x4d, 0xe4, 0xad, 0x8b,
0xf1, 0x63, 0xd0, 0xa3, 0x30, 0x69, 0xf4, 0xdc, 0x9d, 0x55, 0x61, 0xaa, 0xd8, 0x53, 0x27, 0xfe,
0x08, 0x07, 0x09, 0x4f, 0x5e, 0x25, 0x33, 0xf6, 0x85, 0xcd, 0x4e, 0xeb, 0x11, 0x76, 0xd5, 0x74,
0x8c, 0xf5, 0x74, 0x36, 0x5f, 0xcc, 0xb5, 0x1a, 0x39, 0x63, 0xb3, 0xf6, 0x09, 0x8f, 0x43, 0xc9,
0xe2, 0x54, 0x5e, 0x7a, 0xff, 0xf5, 0x1d, 0x4f, 0xa0, 0x57, 0xad, 0x91, 0x65, 0xf8, 0x39, 0xe8,
0x15, 0xc2, 0x47, 0x6b, 0x8d, 0x3b, 0x5f, 0x8e, 0x71, 0xbc, 0x49, 0x37, 0x7b, 0xd7, 0xdc, 0xb7,
0xf3, 0x05, 0xd1, 0x6e, 0x16, 0x44, 0xbb, 0x5d, 0x10, 0xf4, 0xb5, 0x24, 0xe8, 0x7b, 0x49, 0xd0,
0x75, 0x49, 0xd0, 0xbc, 0x24, 0xe8, 0x57, 0x49, 0xd0, 0xef, 0x92, 0x68, 0xb7, 0x25, 0x41, 0x57,
0x4b, 0xa2, 0xcd, 0x97, 0x44, 0xbb, 0x59, 0x12, 0xed, 0xfd, 0xd0, 0x0f, 0x65, 0x90, 0x4f, 0xed,
0x0f, 0x3c, 0x76, 0xfc, 0x8c, 0x9e, 0xd3, 0x84, 0x3a, 0x11, 0xbf, 0x08, 0x9d, 0xf6, 0x37, 0x98,
0xf6, 0x94, 0xda, 0xb3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x9b, 0x6f, 0x19, 0xc8, 0x19, 0x03,
0x00, 0x00,
}
func (this *PushRequest) Equal(that interface{}) bool {
@ -340,6 +405,33 @@ func (this *StreamAdapter) Equal(that interface{}) bool {
}
return true
}
func (this *LabelPairAdapter) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*LabelPairAdapter)
if !ok {
that2, ok := that.(LabelPairAdapter)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Name != that1.Name {
return false
}
if this.Value != that1.Value {
return false
}
return true
}
func (this *EntryAdapter) Equal(that interface{}) bool {
if that == nil {
return this == nil
@ -365,6 +457,14 @@ func (this *EntryAdapter) Equal(that interface{}) bool {
if this.Line != that1.Line {
return false
}
if len(this.NonIndexedLabels) != len(that1.NonIndexedLabels) {
return false
}
for i := range this.NonIndexedLabels {
if !this.NonIndexedLabels[i].Equal(&that1.NonIndexedLabels[i]) {
return false
}
}
return true
}
func (this *PushRequest) GoString() string {
@ -404,14 +504,32 @@ func (this *StreamAdapter) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
func (this *EntryAdapter) GoString() string {
func (this *LabelPairAdapter) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s = append(s, "&push.LabelPairAdapter{")
s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func (this *EntryAdapter) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s = append(s, "&push.EntryAdapter{")
s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n")
s = append(s, "Line: "+fmt.Sprintf("%#v", this.Line)+",\n")
if this.NonIndexedLabels != nil {
vs := make([]*LabelPairAdapter, len(this.NonIndexedLabels))
for i := range vs {
vs[i] = &this.NonIndexedLabels[i]
}
s = append(s, "NonIndexedLabels: "+fmt.Sprintf("%#v", vs)+",\n")
}
s = append(s, "}")
return strings.Join(s, "")
}
@ -613,6 +731,43 @@ func (m *StreamAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *LabelPairAdapter) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LabelPairAdapter) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LabelPairAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintPush(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintPush(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *EntryAdapter) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -633,6 +788,20 @@ func (m *EntryAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.NonIndexedLabels) > 0 {
for iNdEx := len(m.NonIndexedLabels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.NonIndexedLabels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Line) > 0 {
i -= len(m.Line)
copy(dAtA[i:], m.Line)
@ -708,6 +877,23 @@ func (m *StreamAdapter) Size() (n int) {
return n
}
func (m *LabelPairAdapter) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
return n
}
func (m *EntryAdapter) Size() (n int) {
if m == nil {
return 0
@ -720,6 +906,12 @@ func (m *EntryAdapter) Size() (n int) {
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
if len(m.NonIndexedLabels) > 0 {
for _, e := range m.NonIndexedLabels {
l = e.Size()
n += 1 + l + sovPush(uint64(l))
}
}
return n
}
@ -765,13 +957,30 @@ func (this *StreamAdapter) String() string {
}, "")
return s
}
func (this *LabelPairAdapter) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&LabelPairAdapter{`,
`Name:` + fmt.Sprintf("%v", this.Name) + `,`,
`Value:` + fmt.Sprintf("%v", this.Value) + `,`,
`}`,
}, "")
return s
}
func (this *EntryAdapter) String() string {
if this == nil {
return "nil"
}
repeatedStringForNonIndexedLabels := "[]LabelPairAdapter{"
for _, f := range this.NonIndexedLabels {
repeatedStringForNonIndexedLabels += strings.Replace(strings.Replace(f.String(), "LabelPairAdapter", "LabelPairAdapter", 1), `&`, ``, 1) + ","
}
repeatedStringForNonIndexedLabels += "}"
s := strings.Join([]string{`&EntryAdapter{`,
`Timestamp:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timestamp), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Line:` + fmt.Sprintf("%v", this.Line) + `,`,
`NonIndexedLabels:` + repeatedStringForNonIndexedLabels + `,`,
`}`,
}, "")
return s
@ -1062,6 +1271,123 @@ func (m *StreamAdapter) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *LabelPairAdapter) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LabelPairAdapter: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelPairAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *EntryAdapter) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -1156,6 +1482,40 @@ func (m *EntryAdapter) Unmarshal(dAtA []byte) error {
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field NonIndexedLabels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.NonIndexedLabels = append(m.NonIndexedLabels, LabelPairAdapter{})
if err := m.NonIndexedLabels[len(m.NonIndexedLabels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])

@ -30,6 +30,11 @@ message StreamAdapter {
uint64 hash = 3 [(gogoproto.jsontag) = "-"];
}
message LabelPairAdapter {
string name = 1;
string value = 2;
}
message EntryAdapter {
google.protobuf.Timestamp timestamp = 1 [
(gogoproto.stdtime) = true,
@ -37,4 +42,8 @@ message EntryAdapter {
(gogoproto.jsontag) = "ts"
];
string line = 2 [(gogoproto.jsontag) = "line"];
repeated LabelPairAdapter nonIndexedLabels = 3 [
(gogoproto.nullable) = false,
(gogoproto.jsontag) = "nonIndexedLabels,omitempty"
];
}

@ -3,7 +3,11 @@ package push
import (
"fmt"
"io"
"strings"
"time"
"unsafe"
"github.com/prometheus/prometheus/model/labels"
)
// Stream contains a unique labels set as a string and a set of entries for it.
@ -17,8 +21,48 @@ type Stream struct {
// Entry is a log entry with a timestamp.
type Entry struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
NonIndexedLabels labels.Labels `protobuf:"bytes,3,opt,name=nonIndexedLabels,proto3" json:"nonIndexedLabels,omitempty"`
}
type LabelAdapter labels.Label
func (m *LabelAdapter) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LabelAdapter) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LabelAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintPush(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintPush(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *Stream) Marshal() (dAtA []byte, err error) {
@ -90,6 +134,20 @@ func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.NonIndexedLabels) > 0 {
for iNdEx := len(m.NonIndexedLabels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := (*LabelAdapter)(&m.NonIndexedLabels[iNdEx]).MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintPush(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Line) > 0 {
i -= len(m.Line)
copy(dAtA[i:], m.Line)
@ -139,7 +197,7 @@ func (m *Stream) Unmarshal(dAtA []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field NonIndexedLabels", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
@ -341,6 +399,40 @@ func (m *Entry) Unmarshal(dAtA []byte) error {
}
m.Line = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field NonIndexedLabels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.NonIndexedLabels = append(m.NonIndexedLabels, labels.Label{})
if err := (*LabelAdapter)(&m.NonIndexedLabels[len(m.NonIndexedLabels)-1]).Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
@ -366,6 +458,131 @@ func (m *Entry) Unmarshal(dAtA []byte) error {
return nil
}
// Unmarshal a LabelAdapter, implements proto.Unmarshaller.
// NB this is a copy of the autogenerated code to unmarshal a LabelPair,
// with the byte copying replaced with a yoloString.
func (m *LabelAdapter) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LabelPairAdapter: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelPairAdapter: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPush
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPush
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthPush
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = yoloString(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPush(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPush
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func yoloString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
func (m *Stream) Size() (n int) {
if m == nil {
return 0
@ -400,6 +617,29 @@ func (m *Entry) Size() (n int) {
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
if len(m.NonIndexedLabels) > 0 {
for _, e := range m.NonIndexedLabels {
l = (*LabelAdapter)(&e).Size()
n += 1 + l + sovPush(uint64(l))
}
}
return n
}
func (m *LabelAdapter) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovPush(uint64(l))
}
return n
}
@ -461,5 +701,23 @@ func (m *Entry) Equal(that interface{}) bool {
if m.Line != that1.Line {
return false
}
for i := range m.NonIndexedLabels {
if !(*LabelAdapter)(&m.NonIndexedLabels[i]).Equal(LabelAdapter(that1.NonIndexedLabels[i])) {
return false
}
}
return true
}
// Equal implements proto.Equaler.
func (m *LabelAdapter) Equal(other LabelAdapter) bool {
return m.Name == other.Name && m.Value == other.Value
}
// Compare implements proto.Comparer.
func (m *LabelAdapter) Compare(other LabelAdapter) int {
if c := strings.Compare(m.Name, other.Name); c != 0 {
return c
}
return strings.Compare(m.Value, other.Value)
}

Loading…
Cancel
Save