mirror of https://github.com/grafana/loki
Logproto: Extract push.proto from logproto package to the separate module (#8259)
Co-authored-by: Owen Diehl <ow.diehl@gmail.com>pull/8304/head
parent
5993640329
commit
4cd1246b88
@ -0,0 +1,23 @@ |
||||
package logproto |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/pkg/push" |
||||
"google.golang.org/grpc" |
||||
) |
||||
|
||||
// Aliases to avoid renaming all the imports of logproto
|
||||
|
||||
type Entry = push.Entry |
||||
type Stream = push.Stream |
||||
type PushRequest = push.PushRequest |
||||
type PushResponse = push.PushResponse |
||||
type PusherClient = push.PusherClient |
||||
type PusherServer = push.PusherServer |
||||
|
||||
func NewPusherClient(cc *grpc.ClientConn) PusherClient { |
||||
return push.NewPusherClient(cc) |
||||
} |
||||
|
||||
func RegisterPusherServer(s *grpc.Server, srv PusherServer) { |
||||
push.RegisterPusherServer(s, srv) |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -1,471 +1,9 @@ |
||||
package logproto |
||||
|
||||
import ( |
||||
fmt "fmt" |
||||
io "io" |
||||
"time" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
) |
||||
|
||||
// Stream contains a unique labels set as a string and a set of entries for it.
|
||||
// We are not using the proto generated version but this custom one so that we
|
||||
// can improve serialization see benchmark.
|
||||
type Stream struct { |
||||
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` |
||||
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` |
||||
Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` |
||||
} |
||||
|
||||
// Entry is a log entry with a timestamp.
|
||||
type Entry struct { |
||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` |
||||
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` |
||||
} |
||||
|
||||
func (m *Stream) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Stream) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if m.Hash != 0 { |
||||
i = encodeVarintLogproto(dAtA, i, m.Hash) |
||||
i-- |
||||
dAtA[i] = 0x18 |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { |
||||
{ |
||||
size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
i -= size |
||||
i = encodeVarintLogproto(dAtA, i, uint64(size)) |
||||
} |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
} |
||||
if len(m.Labels) > 0 { |
||||
i -= len(m.Labels) |
||||
copy(dAtA[i:], m.Labels) |
||||
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels))) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
} |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
func (m *Entry) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Entry) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if len(m.Line) > 0 { |
||||
i -= len(m.Line) |
||||
copy(dAtA[i:], m.Line) |
||||
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Line))) |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
n7, err7 := StdTimeMarshalTo(m.Timestamp, dAtA[i-SizeOfStdTime(m.Timestamp):]) |
||||
if err7 != nil { |
||||
return 0, err7 |
||||
} |
||||
i -= n7 |
||||
i = encodeVarintLogproto(dAtA, i, uint64(n7)) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
func (m *Stream) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Labels = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Entries = append(m.Entries, Entry{}) |
||||
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 3: |
||||
if wireType != 0 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) |
||||
} |
||||
m.Hash = 0 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
m.Hash |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipLogproto(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Entry) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
if err := StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Line = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipLogproto(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Stream) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = len(m.Labels) |
||||
if l > 0 { |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for _, e := range m.Entries { |
||||
l = e.Size() |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
} |
||||
} |
||||
if m.Hash != 0 { |
||||
n += 1 + sovLogproto(m.Hash) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Entry) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = SizeOfStdTime(m.Timestamp) |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
l = len(m.Line) |
||||
if l > 0 { |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Stream) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Stream) |
||||
if !ok { |
||||
that2, ok := that.(Stream) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if m.Labels != that1.Labels { |
||||
return false |
||||
} |
||||
if len(m.Entries) != len(that1.Entries) { |
||||
return false |
||||
} |
||||
for i := range m.Entries { |
||||
if !m.Entries[i].Equal(that1.Entries[i]) { |
||||
return false |
||||
} |
||||
} |
||||
return m.Hash == that1.Hash |
||||
} |
||||
|
||||
func (m *Entry) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Entry) |
||||
if !ok { |
||||
that2, ok := that.(Entry) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if !m.Timestamp.Equal(that1.Timestamp) { |
||||
return false |
||||
} |
||||
if m.Line != that1.Line { |
||||
return false |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func (c *ChunkRef) FingerprintModel() model.Fingerprint { |
||||
return model.Fingerprint(c.Fingerprint) |
||||
} |
||||
|
||||
@ -0,0 +1,23 @@ |
||||
module github.com/grafana/loki/pkg/push |
||||
|
||||
go 1.19 |
||||
|
||||
require ( |
||||
github.com/gogo/protobuf v1.3.2 |
||||
github.com/stretchr/testify v1.8.1 |
||||
google.golang.org/grpc v1.52.0 |
||||
) |
||||
|
||||
require ( |
||||
github.com/davecgh/go-spew v1.1.1 // indirect |
||||
github.com/golang/protobuf v1.5.2 // indirect |
||||
github.com/kr/text v0.2.0 // indirect |
||||
github.com/pmezard/go-difflib v1.0.0 // indirect |
||||
golang.org/x/net v0.4.0 // indirect |
||||
golang.org/x/sys v0.3.0 // indirect |
||||
golang.org/x/text v0.5.0 // indirect |
||||
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect |
||||
google.golang.org/protobuf v1.28.1 // indirect |
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect |
||||
gopkg.in/yaml.v3 v3.0.1 // indirect |
||||
) |
||||
@ -0,0 +1,69 @@ |
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= |
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= |
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= |
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= |
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= |
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= |
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= |
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= |
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= |
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= |
||||
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= |
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= |
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= |
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= |
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= |
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= |
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= |
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= |
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= |
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= |
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= |
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= |
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= |
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= |
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= |
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= |
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= |
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= |
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= |
||||
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= |
||||
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= |
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= |
||||
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= |
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
||||
golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= |
||||
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= |
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= |
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= |
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= |
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
||||
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 h1:a2S6M0+660BgMNl++4JPlcAO/CjkqYItDEZwkoDQK7c= |
||||
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= |
||||
google.golang.org/grpc v1.52.0 h1:kd48UiU7EHsV4rnLyOJRuP/Il/UHE7gdDAQ+SZI7nZk= |
||||
google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= |
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= |
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= |
||||
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= |
||||
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= |
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= |
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= |
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,40 @@ |
||||
syntax = "proto3"; |
||||
|
||||
package logproto; |
||||
|
||||
import "gogoproto/gogo.proto"; |
||||
import "google/protobuf/timestamp.proto"; |
||||
|
||||
option go_package = "github.com/grafana/loki/pkg/push"; |
||||
|
||||
service Pusher { |
||||
rpc Push(PushRequest) returns (PushResponse) {} |
||||
} |
||||
|
||||
message PushRequest { |
||||
repeated StreamAdapter streams = 1 [ |
||||
(gogoproto.jsontag) = "streams", |
||||
(gogoproto.customtype) = "Stream" |
||||
]; |
||||
} |
||||
|
||||
message PushResponse {} |
||||
|
||||
message StreamAdapter { |
||||
string labels = 1 [(gogoproto.jsontag) = "labels"]; |
||||
repeated EntryAdapter entries = 2 [ |
||||
(gogoproto.nullable) = false, |
||||
(gogoproto.jsontag) = "entries" |
||||
]; |
||||
// hash contains the original hash of the stream. |
||||
uint64 hash = 3 [(gogoproto.jsontag) = "-"]; |
||||
} |
||||
|
||||
message EntryAdapter { |
||||
google.protobuf.Timestamp timestamp = 1 [ |
||||
(gogoproto.stdtime) = true, |
||||
(gogoproto.nullable) = false, |
||||
(gogoproto.jsontag) = "ts" |
||||
]; |
||||
string line = 2 [(gogoproto.jsontag) = "line"]; |
||||
} |
||||
@ -1,9 +1,9 @@ |
||||
package logproto |
||||
package push |
||||
|
||||
import ( |
||||
"errors" |
||||
strconv "strconv" |
||||
time "time" |
||||
"strconv" |
||||
"time" |
||||
|
||||
"github.com/gogo/protobuf/types" |
||||
) |
||||
@ -0,0 +1,465 @@ |
||||
package push |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"time" |
||||
) |
||||
|
||||
// Stream contains a unique labels set as a string and a set of entries for it.
|
||||
// We are not using the proto generated version but this custom one so that we
|
||||
// can improve serialization see benchmark.
|
||||
type Stream struct { |
||||
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` |
||||
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` |
||||
Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` |
||||
} |
||||
|
||||
// Entry is a log entry with a timestamp.
|
||||
type Entry struct { |
||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` |
||||
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` |
||||
} |
||||
|
||||
func (m *Stream) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Stream) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if m.Hash != 0 { |
||||
i = encodeVarintPush(dAtA, i, m.Hash) |
||||
i-- |
||||
dAtA[i] = 0x18 |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { |
||||
{ |
||||
size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
i -= size |
||||
i = encodeVarintPush(dAtA, i, uint64(size)) |
||||
} |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
} |
||||
if len(m.Labels) > 0 { |
||||
i -= len(m.Labels) |
||||
copy(dAtA[i:], m.Labels) |
||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Labels))) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
} |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
func (m *Entry) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Entry) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if len(m.Line) > 0 { |
||||
i -= len(m.Line) |
||||
copy(dAtA[i:], m.Line) |
||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Line))) |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
n7, err7 := StdTimeMarshalTo(m.Timestamp, dAtA[i-SizeOfStdTime(m.Timestamp):]) |
||||
if err7 != nil { |
||||
return 0, err7 |
||||
} |
||||
i -= n7 |
||||
i = encodeVarintPush(dAtA, i, uint64(n7)) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
func (m *Stream) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Labels = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Entries = append(m.Entries, Entry{}) |
||||
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 3: |
||||
if wireType != 0 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) |
||||
} |
||||
m.Hash = 0 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
m.Hash |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipPush(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Entry) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
if err := StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Line = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipPush(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Stream) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = len(m.Labels) |
||||
if l > 0 { |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for _, e := range m.Entries { |
||||
l = e.Size() |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
} |
||||
} |
||||
if m.Hash != 0 { |
||||
n += 1 + sovPush(m.Hash) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Entry) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = SizeOfStdTime(m.Timestamp) |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
l = len(m.Line) |
||||
if l > 0 { |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Stream) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Stream) |
||||
if !ok { |
||||
that2, ok := that.(Stream) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if m.Labels != that1.Labels { |
||||
return false |
||||
} |
||||
if len(m.Entries) != len(that1.Entries) { |
||||
return false |
||||
} |
||||
for i := range m.Entries { |
||||
if !m.Entries[i].Equal(that1.Entries[i]) { |
||||
return false |
||||
} |
||||
} |
||||
return m.Hash == that1.Hash |
||||
} |
||||
|
||||
func (m *Entry) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Entry) |
||||
if !ok { |
||||
that2, ok := that.(Entry) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if !m.Timestamp.Equal(that1.Timestamp) { |
||||
return false |
||||
} |
||||
if m.Line != that1.Line { |
||||
return false |
||||
} |
||||
return true |
||||
} |
||||
@ -1,4 +1,4 @@ |
||||
package logproto |
||||
package push |
||||
|
||||
import ( |
||||
"testing" |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,40 @@ |
||||
syntax = "proto3"; |
||||
|
||||
package logproto; |
||||
|
||||
import "gogoproto/gogo.proto"; |
||||
import "google/protobuf/timestamp.proto"; |
||||
|
||||
option go_package = "github.com/grafana/loki/pkg/push"; |
||||
|
||||
service Pusher { |
||||
rpc Push(PushRequest) returns (PushResponse) {} |
||||
} |
||||
|
||||
message PushRequest { |
||||
repeated StreamAdapter streams = 1 [ |
||||
(gogoproto.jsontag) = "streams", |
||||
(gogoproto.customtype) = "Stream" |
||||
]; |
||||
} |
||||
|
||||
message PushResponse {} |
||||
|
||||
message StreamAdapter { |
||||
string labels = 1 [(gogoproto.jsontag) = "labels"]; |
||||
repeated EntryAdapter entries = 2 [ |
||||
(gogoproto.nullable) = false, |
||||
(gogoproto.jsontag) = "entries" |
||||
]; |
||||
// hash contains the original hash of the stream. |
||||
uint64 hash = 3 [(gogoproto.jsontag) = "-"]; |
||||
} |
||||
|
||||
message EntryAdapter { |
||||
google.protobuf.Timestamp timestamp = 1 [ |
||||
(gogoproto.stdtime) = true, |
||||
(gogoproto.nullable) = false, |
||||
(gogoproto.jsontag) = "ts" |
||||
]; |
||||
string line = 2 [(gogoproto.jsontag) = "line"]; |
||||
} |
||||
@ -0,0 +1,106 @@ |
||||
package push |
||||
|
||||
import ( |
||||
"errors" |
||||
"strconv" |
||||
"time" |
||||
|
||||
"github.com/gogo/protobuf/types" |
||||
) |
||||
|
||||
const ( |
||||
// Seconds field of the earliest valid Timestamp.
|
||||
// This is time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
|
||||
minValidSeconds = -62135596800 |
||||
// Seconds field just after the latest valid Timestamp.
|
||||
// This is time.Date(10000, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
|
||||
maxValidSeconds = 253402300800 |
||||
) |
||||
|
||||
// validateTimestamp determines whether a Timestamp is valid.
|
||||
// A valid timestamp represents a time in the range
|
||||
// [0001-01-01, 10000-01-01) and has a Nanos field
|
||||
// in the range [0, 1e9).
|
||||
//
|
||||
// If the Timestamp is valid, validateTimestamp returns nil.
|
||||
// Otherwise, it returns an error that describes
|
||||
// the problem.
|
||||
//
|
||||
// Every valid Timestamp can be represented by a time.Time, but the converse is not true.
|
||||
func validateTimestamp(ts *types.Timestamp) error { |
||||
if ts == nil { |
||||
return errors.New("timestamp: nil Timestamp") |
||||
} |
||||
if ts.Seconds < minValidSeconds { |
||||
return errors.New("timestamp: " + formatTimestamp(ts) + " before 0001-01-01") |
||||
} |
||||
if ts.Seconds >= maxValidSeconds { |
||||
return errors.New("timestamp: " + formatTimestamp(ts) + " after 10000-01-01") |
||||
} |
||||
if ts.Nanos < 0 || ts.Nanos >= 1e9 { |
||||
return errors.New("timestamp: " + formatTimestamp(ts) + ": nanos not in range [0, 1e9)") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// formatTimestamp is equivalent to fmt.Sprintf("%#v", ts)
|
||||
// but avoids the escape incurred by using fmt.Sprintf, eliminating
|
||||
// unnecessary heap allocations.
|
||||
func formatTimestamp(ts *types.Timestamp) string { |
||||
if ts == nil { |
||||
return "nil" |
||||
} |
||||
|
||||
seconds := strconv.FormatInt(ts.Seconds, 10) |
||||
nanos := strconv.FormatInt(int64(ts.Nanos), 10) |
||||
return "&types.Timestamp{Seconds: " + seconds + ",\nNanos: " + nanos + ",\n}" |
||||
} |
||||
|
||||
func SizeOfStdTime(t time.Time) int { |
||||
ts, err := timestampProto(t) |
||||
if err != nil { |
||||
return 0 |
||||
} |
||||
return ts.Size() |
||||
} |
||||
|
||||
func StdTimeMarshalTo(t time.Time, data []byte) (int, error) { |
||||
ts, err := timestampProto(t) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return ts.MarshalTo(data) |
||||
} |
||||
|
||||
func StdTimeUnmarshal(t *time.Time, data []byte) error { |
||||
ts := &types.Timestamp{} |
||||
if err := ts.Unmarshal(data); err != nil { |
||||
return err |
||||
} |
||||
tt, err := timestampFromProto(ts) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
*t = tt |
||||
return nil |
||||
} |
||||
|
||||
func timestampFromProto(ts *types.Timestamp) (time.Time, error) { |
||||
// Don't return the zero value on error, because corresponds to a valid
|
||||
// timestamp. Instead return whatever time.Unix gives us.
|
||||
var t time.Time |
||||
if ts == nil { |
||||
t = time.Unix(0, 0).UTC() // treat nil like the empty Timestamp
|
||||
} else { |
||||
t = time.Unix(ts.Seconds, int64(ts.Nanos)).UTC() |
||||
} |
||||
return t, validateTimestamp(ts) |
||||
} |
||||
|
||||
func timestampProto(t time.Time) (types.Timestamp, error) { |
||||
ts := types.Timestamp{ |
||||
Seconds: t.Unix(), |
||||
Nanos: int32(t.Nanosecond()), |
||||
} |
||||
return ts, validateTimestamp(&ts) |
||||
} |
||||
@ -0,0 +1,465 @@ |
||||
package push |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"time" |
||||
) |
||||
|
||||
// Stream contains a unique labels set as a string and a set of entries for it.
|
||||
// We are not using the proto generated version but this custom one so that we
|
||||
// can improve serialization see benchmark.
|
||||
type Stream struct { |
||||
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` |
||||
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` |
||||
Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` |
||||
} |
||||
|
||||
// Entry is a log entry with a timestamp.
|
||||
type Entry struct { |
||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` |
||||
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` |
||||
} |
||||
|
||||
func (m *Stream) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Stream) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if m.Hash != 0 { |
||||
i = encodeVarintPush(dAtA, i, m.Hash) |
||||
i-- |
||||
dAtA[i] = 0x18 |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { |
||||
{ |
||||
size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
i -= size |
||||
i = encodeVarintPush(dAtA, i, uint64(size)) |
||||
} |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
} |
||||
if len(m.Labels) > 0 { |
||||
i -= len(m.Labels) |
||||
copy(dAtA[i:], m.Labels) |
||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Labels))) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
} |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
func (m *Entry) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Entry) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if len(m.Line) > 0 { |
||||
i -= len(m.Line) |
||||
copy(dAtA[i:], m.Line) |
||||
i = encodeVarintPush(dAtA, i, uint64(len(m.Line))) |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
n7, err7 := StdTimeMarshalTo(m.Timestamp, dAtA[i-SizeOfStdTime(m.Timestamp):]) |
||||
if err7 != nil { |
||||
return 0, err7 |
||||
} |
||||
i -= n7 |
||||
i = encodeVarintPush(dAtA, i, uint64(n7)) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
func (m *Stream) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Labels = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Entries = append(m.Entries, Entry{}) |
||||
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 3: |
||||
if wireType != 0 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) |
||||
} |
||||
m.Hash = 0 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
m.Hash |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipPush(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Entry) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
if err := StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowPush |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Line = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipPush(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthPush |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Stream) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = len(m.Labels) |
||||
if l > 0 { |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for _, e := range m.Entries { |
||||
l = e.Size() |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
} |
||||
} |
||||
if m.Hash != 0 { |
||||
n += 1 + sovPush(m.Hash) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Entry) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = SizeOfStdTime(m.Timestamp) |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
l = len(m.Line) |
||||
if l > 0 { |
||||
n += 1 + l + sovPush(uint64(l)) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Stream) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Stream) |
||||
if !ok { |
||||
that2, ok := that.(Stream) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if m.Labels != that1.Labels { |
||||
return false |
||||
} |
||||
if len(m.Entries) != len(that1.Entries) { |
||||
return false |
||||
} |
||||
for i := range m.Entries { |
||||
if !m.Entries[i].Equal(that1.Entries[i]) { |
||||
return false |
||||
} |
||||
} |
||||
return m.Hash == that1.Hash |
||||
} |
||||
|
||||
func (m *Entry) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Entry) |
||||
if !ok { |
||||
that2, ok := that.(Entry) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if !m.Timestamp.Equal(that1.Timestamp) { |
||||
return false |
||||
} |
||||
if m.Line != that1.Line { |
||||
return false |
||||
} |
||||
return true |
||||
} |
||||
149
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/cluster_resource_type.go
generated
vendored
149
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/cluster_resource_type.go
generated
vendored
@ -0,0 +1,149 @@ |
||||
/* |
||||
* |
||||
* Copyright 2022 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package xdsresource |
||||
|
||||
import ( |
||||
"google.golang.org/grpc/internal/pretty" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
) |
||||
|
||||
var ( |
||||
// Compile time interface checks.
|
||||
_ Type = clusterResourceType{} |
||||
_ ResourceData = &ClusterResourceData{} |
||||
|
||||
// Singleton instantiation of the resource type implementation.
|
||||
clusterType = clusterResourceType{ |
||||
resourceTypeState: resourceTypeState{ |
||||
v2TypeURL: "type.googleapis.com/envoy.api.v2.Cluster", |
||||
v3TypeURL: "type.googleapis.com/envoy.config.cluster.v3.Cluster", |
||||
typeEnum: ClusterResource, |
||||
allResourcesRequiredInSotW: true, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
// clusterResourceType provides the resource-type specific functionality for a
|
||||
// Cluster resource.
|
||||
//
|
||||
// Implements the Type interface.
|
||||
type clusterResourceType struct { |
||||
resourceTypeState |
||||
} |
||||
|
||||
// Decode deserializes and validates an xDS resource serialized inside the
|
||||
// provided `Any` proto, as received from the xDS management server.
|
||||
func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { |
||||
name, cluster, err := unmarshalClusterResource(resource, nil, opts.Logger) |
||||
switch { |
||||
case name == "": |
||||
// Name is unset only when protobuf deserialization fails.
|
||||
return nil, err |
||||
case err != nil: |
||||
// Protobuf deserialization succeeded, but resource validation failed.
|
||||
return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err |
||||
} |
||||
|
||||
// Perform extra validation here.
|
||||
if err := securityConfigValidator(opts.BootstrapConfig, cluster.SecurityCfg); err != nil { |
||||
return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err |
||||
} |
||||
|
||||
return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: cluster}}, nil |
||||
|
||||
} |
||||
|
||||
// ClusterResourceData wraps the configuration of a Cluster resource as received
|
||||
// from the management server.
|
||||
//
|
||||
// Implements the ResourceData interface.
|
||||
type ClusterResourceData struct { |
||||
ResourceData |
||||
|
||||
// TODO: We have always stored update structs by value. See if this can be
|
||||
// switched to a pointer?
|
||||
Resource ClusterUpdate |
||||
} |
||||
|
||||
// Equal returns true if other is equal to r.
|
||||
func (c *ClusterResourceData) Equal(other ResourceData) bool { |
||||
if c == nil && other == nil { |
||||
return true |
||||
} |
||||
if (c == nil) != (other == nil) { |
||||
return false |
||||
} |
||||
return proto.Equal(c.Resource.Raw, other.Raw()) |
||||
|
||||
} |
||||
|
||||
// ToJSON returns a JSON string representation of the resource data.
|
||||
func (c *ClusterResourceData) ToJSON() string { |
||||
return pretty.ToJSON(c.Resource) |
||||
} |
||||
|
||||
// Raw returns the underlying raw protobuf form of the cluster resource.
|
||||
func (c *ClusterResourceData) Raw() *anypb.Any { |
||||
return c.Resource.Raw |
||||
} |
||||
|
||||
// ClusterWatcher wraps the callbacks to be invoked for different events
|
||||
// corresponding to the cluster resource being watched.
|
||||
type ClusterWatcher interface { |
||||
// OnUpdate is invoked to report an update for the resource being watched.
|
||||
OnUpdate(*ClusterResourceData) |
||||
|
||||
// OnError is invoked under different error conditions including but not
|
||||
// limited to the following:
|
||||
// - authority mentioned in the resource is not found
|
||||
// - resource name parsing error
|
||||
// - resource deserialization error
|
||||
// - resource validation error
|
||||
// - ADS stream failure
|
||||
// - connection failure
|
||||
OnError(error) |
||||
|
||||
// OnResourceDoesNotExist is invoked for a specific error condition where
|
||||
// the requested resource is not found on the xDS management server.
|
||||
OnResourceDoesNotExist() |
||||
} |
||||
|
||||
type delegatingClusterWatcher struct { |
||||
watcher ClusterWatcher |
||||
} |
||||
|
||||
func (d *delegatingClusterWatcher) OnUpdate(data ResourceData) { |
||||
c := data.(*ClusterResourceData) |
||||
d.watcher.OnUpdate(c) |
||||
} |
||||
|
||||
func (d *delegatingClusterWatcher) OnError(err error) { |
||||
d.watcher.OnError(err) |
||||
} |
||||
|
||||
func (d *delegatingClusterWatcher) OnResourceDoesNotExist() { |
||||
d.watcher.OnResourceDoesNotExist() |
||||
} |
||||
|
||||
// WatchCluster uses xDS to discover the configuration associated with the
|
||||
// provided cluster resource name.
|
||||
func WatchCluster(p Producer, name string, w ClusterWatcher) (cancel func()) { |
||||
delegator := &delegatingClusterWatcher{watcher: w} |
||||
return p.WatchResource(clusterType, name, delegator) |
||||
} |
||||
144
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go
generated
vendored
144
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/endpoints_resource_type.go
generated
vendored
@ -0,0 +1,144 @@ |
||||
/* |
||||
* |
||||
* Copyright 2022 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package xdsresource |
||||
|
||||
import ( |
||||
"google.golang.org/grpc/internal/pretty" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
) |
||||
|
||||
var ( |
||||
// Compile time interface checks.
|
||||
_ Type = endpointsResourceType{} |
||||
_ ResourceData = &EndpointsResourceData{} |
||||
|
||||
// Singleton instantiation of the resource type implementation.
|
||||
endpointsType = endpointsResourceType{ |
||||
resourceTypeState: resourceTypeState{ |
||||
v2TypeURL: "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", |
||||
v3TypeURL: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", |
||||
typeEnum: EndpointsResource, |
||||
allResourcesRequiredInSotW: false, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
// endpointsResourceType provides the resource-type specific functionality for a
|
||||
// ClusterLoadAssignment (or Endpoints) resource.
|
||||
//
|
||||
// Implements the Type interface.
|
||||
type endpointsResourceType struct { |
||||
resourceTypeState |
||||
} |
||||
|
||||
// Decode deserializes and validates an xDS resource serialized inside the
|
||||
// provided `Any` proto, as received from the xDS management server.
|
||||
func (endpointsResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { |
||||
name, rc, err := unmarshalEndpointsResource(resource, opts.Logger) |
||||
switch { |
||||
case name == "": |
||||
// Name is unset only when protobuf deserialization fails.
|
||||
return nil, err |
||||
case err != nil: |
||||
// Protobuf deserialization succeeded, but resource validation failed.
|
||||
return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: EndpointsUpdate{}}}, err |
||||
} |
||||
|
||||
return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: rc}}, nil |
||||
|
||||
} |
||||
|
||||
// EndpointsResourceData wraps the configuration of an Endpoints resource as
|
||||
// received from the management server.
|
||||
//
|
||||
// Implements the ResourceData interface.
|
||||
type EndpointsResourceData struct { |
||||
ResourceData |
||||
|
||||
// TODO: We have always stored update structs by value. See if this can be
|
||||
// switched to a pointer?
|
||||
Resource EndpointsUpdate |
||||
} |
||||
|
||||
// Equal returns true if other is equal to r.
|
||||
func (e *EndpointsResourceData) Equal(other ResourceData) bool { |
||||
if e == nil && other == nil { |
||||
return true |
||||
} |
||||
if (e == nil) != (other == nil) { |
||||
return false |
||||
} |
||||
return proto.Equal(e.Resource.Raw, other.Raw()) |
||||
|
||||
} |
||||
|
||||
// ToJSON returns a JSON string representation of the resource data.
|
||||
func (e *EndpointsResourceData) ToJSON() string { |
||||
return pretty.ToJSON(e.Resource) |
||||
} |
||||
|
||||
// Raw returns the underlying raw protobuf form of the listener resource.
|
||||
func (e *EndpointsResourceData) Raw() *anypb.Any { |
||||
return e.Resource.Raw |
||||
} |
||||
|
||||
// EndpointsWatcher wraps the callbacks to be invoked for different
|
||||
// events corresponding to the endpoints resource being watched.
|
||||
type EndpointsWatcher interface { |
||||
// OnUpdate is invoked to report an update for the resource being watched.
|
||||
OnUpdate(*EndpointsResourceData) |
||||
|
||||
// OnError is invoked under different error conditions including but not
|
||||
// limited to the following:
|
||||
// - authority mentioned in the resource is not found
|
||||
// - resource name parsing error
|
||||
// - resource deserialization error
|
||||
// - resource validation error
|
||||
// - ADS stream failure
|
||||
// - connection failure
|
||||
OnError(error) |
||||
|
||||
// OnResourceDoesNotExist is invoked for a specific error condition where
|
||||
// the requested resource is not found on the xDS management server.
|
||||
OnResourceDoesNotExist() |
||||
} |
||||
|
||||
type delegatingEndpointsWatcher struct { |
||||
watcher EndpointsWatcher |
||||
} |
||||
|
||||
func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData) { |
||||
e := data.(*EndpointsResourceData) |
||||
d.watcher.OnUpdate(e) |
||||
} |
||||
|
||||
func (d *delegatingEndpointsWatcher) OnError(err error) { |
||||
d.watcher.OnError(err) |
||||
} |
||||
|
||||
func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist() { |
||||
d.watcher.OnResourceDoesNotExist() |
||||
} |
||||
|
||||
// WatchEndpoints uses xDS to discover the configuration associated with the
|
||||
// provided endpoints resource name.
|
||||
func WatchEndpoints(p Producer, name string, w EndpointsWatcher) (cancel func()) { |
||||
delegator := &delegatingEndpointsWatcher{watcher: w} |
||||
return p.WatchResource(endpointsType, name, delegator) |
||||
} |
||||
181
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/listener_resource_type.go
generated
vendored
181
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/listener_resource_type.go
generated
vendored
@ -0,0 +1,181 @@ |
||||
/* |
||||
* |
||||
* Copyright 2022 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package xdsresource |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"google.golang.org/grpc/internal/pretty" |
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
) |
||||
|
||||
var ( |
||||
// Compile time interface checks.
|
||||
_ Type = listenerResourceType{} |
||||
_ ResourceData = &ListenerResourceData{} |
||||
|
||||
// Singleton instantiation of the resource type implementation.
|
||||
listenerType = listenerResourceType{ |
||||
resourceTypeState: resourceTypeState{ |
||||
v2TypeURL: "type.googleapis.com/envoy.api.v2.Listener", |
||||
v3TypeURL: "type.googleapis.com/envoy.config.listener.v3.Listener", |
||||
typeEnum: ListenerResource, |
||||
allResourcesRequiredInSotW: true, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
// listenerResourceType provides the resource-type specific functionality for a
|
||||
// Listener resource.
|
||||
//
|
||||
// Implements the Type interface.
|
||||
type listenerResourceType struct { |
||||
resourceTypeState |
||||
} |
||||
|
||||
func securityConfigValidator(bc *bootstrap.Config, sc *SecurityConfig) error { |
||||
if sc == nil { |
||||
return nil |
||||
} |
||||
if sc.IdentityInstanceName != "" { |
||||
if _, ok := bc.CertProviderConfigs[sc.IdentityInstanceName]; !ok { |
||||
return fmt.Errorf("identitiy certificate provider instance name %q missing in bootstrap configuration", sc.IdentityInstanceName) |
||||
} |
||||
} |
||||
if sc.RootInstanceName != "" { |
||||
if _, ok := bc.CertProviderConfigs[sc.RootInstanceName]; !ok { |
||||
return fmt.Errorf("root certificate provider instance name %q missing in bootstrap configuration", sc.RootInstanceName) |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func listenerValidator(bc *bootstrap.Config, lis ListenerUpdate) error { |
||||
if lis.InboundListenerCfg == nil || lis.InboundListenerCfg.FilterChains == nil { |
||||
return nil |
||||
} |
||||
return lis.InboundListenerCfg.FilterChains.Validate(func(fc *FilterChain) error { |
||||
if fc == nil { |
||||
return nil |
||||
} |
||||
return securityConfigValidator(bc, fc.SecurityCfg) |
||||
}) |
||||
} |
||||
|
||||
// Decode deserializes and validates an xDS resource serialized inside the
|
||||
// provided `Any` proto, as received from the xDS management server.
|
||||
func (listenerResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { |
||||
name, listener, err := unmarshalListenerResource(resource, nil, opts.Logger) |
||||
switch { |
||||
case name == "": |
||||
// Name is unset only when protobuf deserialization fails.
|
||||
return nil, err |
||||
case err != nil: |
||||
// Protobuf deserialization succeeded, but resource validation failed.
|
||||
return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err |
||||
} |
||||
|
||||
// Perform extra validation here.
|
||||
if err := listenerValidator(opts.BootstrapConfig, listener); err != nil { |
||||
return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err |
||||
} |
||||
|
||||
return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: listener}}, nil |
||||
|
||||
} |
||||
|
||||
// ListenerResourceData wraps the configuration of a Listener resource as
|
||||
// received from the management server.
|
||||
//
|
||||
// Implements the ResourceData interface.
|
||||
type ListenerResourceData struct { |
||||
ResourceData |
||||
|
||||
// TODO: We have always stored update structs by value. See if this can be
|
||||
// switched to a pointer?
|
||||
Resource ListenerUpdate |
||||
} |
||||
|
||||
// Equal returns true if other is equal to l.
|
||||
func (l *ListenerResourceData) Equal(other ResourceData) bool { |
||||
if l == nil && other == nil { |
||||
return true |
||||
} |
||||
if (l == nil) != (other == nil) { |
||||
return false |
||||
} |
||||
return proto.Equal(l.Resource.Raw, other.Raw()) |
||||
|
||||
} |
||||
|
||||
// ToJSON returns a JSON string representation of the resource data.
|
||||
func (l *ListenerResourceData) ToJSON() string { |
||||
return pretty.ToJSON(l.Resource) |
||||
} |
||||
|
||||
// Raw returns the underlying raw protobuf form of the listener resource.
|
||||
func (l *ListenerResourceData) Raw() *anypb.Any { |
||||
return l.Resource.Raw |
||||
} |
||||
|
||||
// ListenerWatcher wraps the callbacks to be invoked for different
|
||||
// events corresponding to the listener resource being watched.
|
||||
type ListenerWatcher interface { |
||||
// OnUpdate is invoked to report an update for the resource being watched.
|
||||
OnUpdate(*ListenerResourceData) |
||||
|
||||
// OnError is invoked under different error conditions including but not
|
||||
// limited to the following:
|
||||
// - authority mentioned in the resource is not found
|
||||
// - resource name parsing error
|
||||
// - resource deserialization error
|
||||
// - resource validation error
|
||||
// - ADS stream failure
|
||||
// - connection failure
|
||||
OnError(error) |
||||
|
||||
// OnResourceDoesNotExist is invoked for a specific error condition where
|
||||
// the requested resource is not found on the xDS management server.
|
||||
OnResourceDoesNotExist() |
||||
} |
||||
|
||||
type delegatingListenerWatcher struct { |
||||
watcher ListenerWatcher |
||||
} |
||||
|
||||
func (d *delegatingListenerWatcher) OnUpdate(data ResourceData) { |
||||
l := data.(*ListenerResourceData) |
||||
d.watcher.OnUpdate(l) |
||||
} |
||||
|
||||
func (d *delegatingListenerWatcher) OnError(err error) { |
||||
d.watcher.OnError(err) |
||||
} |
||||
|
||||
func (d *delegatingListenerWatcher) OnResourceDoesNotExist() { |
||||
d.watcher.OnResourceDoesNotExist() |
||||
} |
||||
|
||||
// WatchListener uses xDS to discover the configuration associated with the
|
||||
// provided listener resource name.
|
||||
func WatchListener(p Producer, name string, w ListenerWatcher) (cancel func()) { |
||||
delegator := &delegatingListenerWatcher{watcher: w} |
||||
return p.WatchResource(listenerType, name, delegator) |
||||
} |
||||
158
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/resource_type.go
generated
vendored
158
vendor/google.golang.org/grpc/xds/internal/xdsclient/xdsresource/resource_type.go
generated
vendored
@ -0,0 +1,158 @@ |
||||
/* |
||||
* |
||||
* Copyright 2022 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package xdsresource |
||||
|
||||
import ( |
||||
"google.golang.org/grpc/internal/grpclog" |
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
) |
||||
|
||||
// Producer contains a single method to discover resource configuration from a
|
||||
// remote management server using xDS APIs.
|
||||
//
|
||||
// The xdsclient package provides a concrete implementation of this interface.
|
||||
type Producer interface { |
||||
// WatchResource uses xDS to discover the resource associated with the
|
||||
// provided resource name. The resource type implementation determines how
|
||||
// xDS requests are sent out and how responses are deserialized and
|
||||
// validated. Upon receipt of a response from the management server, an
|
||||
// appropriate callback on the watcher is invoked.
|
||||
WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func()) |
||||
} |
||||
|
||||
// ResourceWatcher wraps the callbacks to be invoked for different events
|
||||
// corresponding to the resource being watched.
|
||||
type ResourceWatcher interface { |
||||
// OnUpdate is invoked to report an update for the resource being watched.
|
||||
// The ResourceData parameter needs to be type asserted to the appropriate
|
||||
// type for the resource being watched.
|
||||
OnUpdate(ResourceData) |
||||
|
||||
// OnError is invoked under different error conditions including but not
|
||||
// limited to the following:
|
||||
// - authority mentioned in the resource is not found
|
||||
// - resource name parsing error
|
||||
// - resource deserialization error
|
||||
// - resource validation error
|
||||
// - ADS stream failure
|
||||
// - connection failure
|
||||
OnError(error) |
||||
|
||||
// OnResourceDoesNotExist is invoked for a specific error condition where
|
||||
// the requested resource is not found on the xDS management server.
|
||||
OnResourceDoesNotExist() |
||||
} |
||||
|
||||
// TODO: Once the implementation is complete, rename this interface as
|
||||
// ResourceType and get rid of the existing ResourceType enum.
|
||||
|
||||
// Type wraps all resource-type specific functionality. Each supported resource
|
||||
// type will provide an implementation of this interface.
|
||||
type Type interface { |
||||
// V2TypeURL is the xDS type URL of this resource type for v2 transport.
|
||||
V2TypeURL() string |
||||
|
||||
// V3TypeURL is the xDS type URL of this resource type for v3 transport.
|
||||
V3TypeURL() string |
||||
|
||||
// TypeEnum is an enumerated value for this resource type. This can be used
|
||||
// for logging/debugging purposes, as well in cases where the resource type
|
||||
// is to be uniquely identified but the actual functionality provided by the
|
||||
// resource type is not required.
|
||||
//
|
||||
// TODO: once Type is renamed to ResourceType, rename ResourceType to
|
||||
// ResourceTypeEnum.
|
||||
TypeEnum() ResourceType |
||||
|
||||
// AllResourcesRequiredInSotW indicates whether this resource type requires
|
||||
// that all resources be present in every SotW response from the server. If
|
||||
// true, a response that does not include a previously seen resource will be
|
||||
// interpreted as a deletion of that resource.
|
||||
AllResourcesRequiredInSotW() bool |
||||
|
||||
// Decode deserializes and validates an xDS resource serialized inside the
|
||||
// provided `Any` proto, as received from the xDS management server.
|
||||
//
|
||||
// If protobuf deserialization fails or resource validation fails,
|
||||
// returns a non-nil error. Otherwise, returns a fully populated
|
||||
// DecodeResult.
|
||||
Decode(*DecodeOptions, *anypb.Any) (*DecodeResult, error) |
||||
} |
||||
|
||||
// ResourceData contains the configuration data sent by the xDS management
|
||||
// server, associated with the resource being watched. Every resource type must
|
||||
// provide an implementation of this interface to represent the configuration
|
||||
// received from the xDS management server.
|
||||
type ResourceData interface { |
||||
isResourceData() |
||||
|
||||
// Equal returns true if the passed in resource data is equal to that of the
|
||||
// receiver.
|
||||
Equal(ResourceData) bool |
||||
|
||||
// ToJSON returns a JSON string representation of the resource data.
|
||||
ToJSON() string |
||||
|
||||
Raw() *anypb.Any |
||||
} |
||||
|
||||
// DecodeOptions wraps the options required by ResourceType implementation for
|
||||
// decoding configuration received from the xDS management server.
|
||||
type DecodeOptions struct { |
||||
// BootstrapConfig contains the bootstrap configuration passed to the
|
||||
// top-level xdsClient. This contains useful data for resource validation.
|
||||
BootstrapConfig *bootstrap.Config |
||||
// Logger is to be used for emitting logs during the Decode operation.
|
||||
Logger *grpclog.PrefixLogger |
||||
} |
||||
|
||||
// DecodeResult is the result of a decode operation.
|
||||
type DecodeResult struct { |
||||
// Name is the name of the resource being watched.
|
||||
Name string |
||||
// Resource contains the configuration associated with the resource being
|
||||
// watched.
|
||||
Resource ResourceData |
||||
} |
||||
|
||||
// resourceTypeState wraps the static state associated with concrete resource
|
||||
// type implementations, which can then embed this struct and get the methods
|
||||
// implemented here for free.
|
||||
type resourceTypeState struct { |
||||
v2TypeURL string |
||||
v3TypeURL string |
||||
typeEnum ResourceType |
||||
allResourcesRequiredInSotW bool |
||||
} |
||||
|
||||
func (r resourceTypeState) V2TypeURL() string { |
||||
return r.v2TypeURL |
||||
} |
||||
|
||||
func (r resourceTypeState) V3TypeURL() string { |
||||
return r.v3TypeURL |
||||
} |
||||
|
||||
func (r resourceTypeState) TypeEnum() ResourceType { |
||||
return r.typeEnum |
||||
} |
||||
|
||||
func (r resourceTypeState) AllResourcesRequiredInSotW() bool { |
||||
return r.allResourcesRequiredInSotW |
||||
} |
||||
@ -0,0 +1,145 @@ |
||||
/* |
||||
* |
||||
* Copyright 2022 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package xdsresource |
||||
|
||||
import ( |
||||
"google.golang.org/grpc/internal/pretty" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
) |
||||
|
||||
var ( |
||||
// Compile time interface checks.
|
||||
_ Type = routeConfigResourceType{} |
||||
_ ResourceData = &RouteConfigResourceData{} |
||||
|
||||
// Singleton instantiation of the resource type implementation.
|
||||
routeConfigType = routeConfigResourceType{ |
||||
resourceTypeState: resourceTypeState{ |
||||
v2TypeURL: "type.googleapis.com/envoy.api.v2.RouteConfiguration", |
||||
v3TypeURL: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", |
||||
typeEnum: RouteConfigResource, |
||||
allResourcesRequiredInSotW: false, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
// routeConfigResourceType provides the resource-type specific functionality for
|
||||
// a RouteConfiguration resource.
|
||||
//
|
||||
// Implements the Type interface.
|
||||
type routeConfigResourceType struct { |
||||
resourceTypeState |
||||
} |
||||
|
||||
// Decode deserializes and validates an xDS resource serialized inside the
|
||||
// provided `Any` proto, as received from the xDS management server.
|
||||
func (routeConfigResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { |
||||
name, rc, err := unmarshalRouteConfigResource(resource, opts.Logger) |
||||
switch { |
||||
case name == "": |
||||
// Name is unset only when protobuf deserialization fails.
|
||||
return nil, err |
||||
case err != nil: |
||||
// Protobuf deserialization succeeded, but resource validation failed.
|
||||
return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: RouteConfigUpdate{}}}, err |
||||
} |
||||
|
||||
return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: rc}}, nil |
||||
|
||||
} |
||||
|
||||
// RouteConfigResourceData wraps the configuration of a RouteConfiguration
|
||||
// resource as received from the management server.
|
||||
//
|
||||
// Implements the ResourceData interface.
|
||||
type RouteConfigResourceData struct { |
||||
ResourceData |
||||
|
||||
// TODO: We have always stored update structs by value. See if this can be
|
||||
// switched to a pointer?
|
||||
Resource RouteConfigUpdate |
||||
} |
||||
|
||||
// Equal returns true if other is equal to r.
|
||||
func (r *RouteConfigResourceData) Equal(other ResourceData) bool { |
||||
if r == nil && other == nil { |
||||
return true |
||||
} |
||||
if (r == nil) != (other == nil) { |
||||
return false |
||||
} |
||||
return proto.Equal(r.Resource.Raw, other.Raw()) |
||||
|
||||
} |
||||
|
||||
// ToJSON returns a JSON string representation of the resource data.
|
||||
func (r *RouteConfigResourceData) ToJSON() string { |
||||
return pretty.ToJSON(r.Resource) |
||||
} |
||||
|
||||
// Raw returns the underlying raw protobuf form of the route configuration
|
||||
// resource.
|
||||
func (r *RouteConfigResourceData) Raw() *anypb.Any { |
||||
return r.Resource.Raw |
||||
} |
||||
|
||||
// RouteConfigWatcher wraps the callbacks to be invoked for different
|
||||
// events corresponding to the route configuration resource being watched.
|
||||
type RouteConfigWatcher interface { |
||||
// OnUpdate is invoked to report an update for the resource being watched.
|
||||
OnUpdate(*RouteConfigResourceData) |
||||
|
||||
// OnError is invoked under different error conditions including but not
|
||||
// limited to the following:
|
||||
// - authority mentioned in the resource is not found
|
||||
// - resource name parsing error
|
||||
// - resource deserialization error
|
||||
// - resource validation error
|
||||
// - ADS stream failure
|
||||
// - connection failure
|
||||
OnError(error) |
||||
|
||||
// OnResourceDoesNotExist is invoked for a specific error condition where
|
||||
// the requested resource is not found on the xDS management server.
|
||||
OnResourceDoesNotExist() |
||||
} |
||||
|
||||
type delegatingRouteConfigWatcher struct { |
||||
watcher RouteConfigWatcher |
||||
} |
||||
|
||||
func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData) { |
||||
rc := data.(*RouteConfigResourceData) |
||||
d.watcher.OnUpdate(rc) |
||||
} |
||||
|
||||
func (d *delegatingRouteConfigWatcher) OnError(err error) { |
||||
d.watcher.OnError(err) |
||||
} |
||||
|
||||
func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist() { |
||||
d.watcher.OnResourceDoesNotExist() |
||||
} |
||||
|
||||
// WatchRouteConfig uses xDS to discover the configuration associated with the
|
||||
// provided route configuration resource name.
|
||||
func WatchRouteConfig(p Producer, name string, w RouteConfigWatcher) (cancel func()) { |
||||
delegator := &delegatingRouteConfigWatcher{watcher: w} |
||||
return p.WatchResource(routeConfigType, name, delegator) |
||||
} |
||||
Loading…
Reference in new issue