feat(rf1): Store index ref in metastore (#13613)

pull/13148/head
Cyril Tovena 2 years ago committed by GitHub
parent f586c00a9f
commit 5f5fd4e4ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 364
      pkg/ingester-rf1/metastore/metastorepb/metastore.pb.go
  2. 10
      pkg/ingester-rf1/metastore/metastorepb/metastore.proto
  3. 3
      pkg/querier/querier.go
  4. 6
      pkg/storage/wal/segment.go
  5. 10
      pkg/storage/wal/segment_test.go

@ -6,6 +6,7 @@ package metastorepb
import (
context "context"
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
@ -112,7 +113,8 @@ type BlockMeta struct {
MinTime int64 `protobuf:"varint,3,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"`
MaxTime int64 `protobuf:"varint,4,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"`
CompactionLevel uint32 `protobuf:"varint,6,opt,name=compaction_level,json=compactionLevel,proto3" json:"compaction_level,omitempty"`
TenantStreams []*TenantStreams `protobuf:"bytes,7,rep,name=tenant_streams,json=tenantStreams,proto3" json:"tenant_streams,omitempty"`
IndexRef DataRef `protobuf:"bytes,7,opt,name=indexRef,proto3" json:"indexRef"`
TenantStreams []*TenantStreams `protobuf:"bytes,8,rep,name=tenant_streams,json=tenantStreams,proto3" json:"tenant_streams,omitempty"`
}
func (m *BlockMeta) Reset() { *m = BlockMeta{} }
@ -182,6 +184,13 @@ func (m *BlockMeta) GetCompactionLevel() uint32 {
return 0
}
func (m *BlockMeta) GetIndexRef() DataRef {
if m != nil {
return m.IndexRef
}
return DataRef{}
}
func (m *BlockMeta) GetTenantStreams() []*TenantStreams {
if m != nil {
return m.TenantStreams
@ -189,6 +198,57 @@ func (m *BlockMeta) GetTenantStreams() []*TenantStreams {
return nil
}
type DataRef struct {
Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"`
Length int64 `protobuf:"varint,2,opt,name=length,proto3" json:"length,omitempty"`
}
func (m *DataRef) Reset() { *m = DataRef{} }
func (*DataRef) ProtoMessage() {}
func (*DataRef) Descriptor() ([]byte, []int) {
return fileDescriptor_43ce85359599db4e, []int{3}
}
func (m *DataRef) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *DataRef) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_DataRef.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *DataRef) XXX_Merge(src proto.Message) {
xxx_messageInfo_DataRef.Merge(m, src)
}
func (m *DataRef) XXX_Size() int {
return m.Size()
}
func (m *DataRef) XXX_DiscardUnknown() {
xxx_messageInfo_DataRef.DiscardUnknown(m)
}
var xxx_messageInfo_DataRef proto.InternalMessageInfo
func (m *DataRef) GetOffset() int64 {
if m != nil {
return m.Offset
}
return 0
}
func (m *DataRef) GetLength() int64 {
if m != nil {
return m.Length
}
return 0
}
// TenantStreams object points to the offset in the block at which
// the tenant streams data is located.
type TenantStreams struct {
@ -200,7 +260,7 @@ type TenantStreams struct {
func (m *TenantStreams) Reset() { *m = TenantStreams{} }
func (*TenantStreams) ProtoMessage() {}
func (*TenantStreams) Descriptor() ([]byte, []int) {
return fileDescriptor_43ce85359599db4e, []int{3}
return fileDescriptor_43ce85359599db4e, []int{4}
}
func (m *TenantStreams) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -254,6 +314,7 @@ func init() {
proto.RegisterType((*AddBlockRequest)(nil), "metastorepb.AddBlockRequest")
proto.RegisterType((*AddBlockResponse)(nil), "metastorepb.AddBlockResponse")
proto.RegisterType((*BlockMeta)(nil), "metastorepb.BlockMeta")
proto.RegisterType((*DataRef)(nil), "metastorepb.DataRef")
proto.RegisterType((*TenantStreams)(nil), "metastorepb.TenantStreams")
}
@ -262,32 +323,37 @@ func init() {
}
var fileDescriptor_43ce85359599db4e = []byte{
// 400 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0x41, 0x6f, 0xd3, 0x30,
0x14, 0xc7, 0xe3, 0x76, 0x6c, 0x8d, 0xab, 0x74, 0x95, 0x0f, 0x28, 0x0c, 0xb0, 0xa2, 0x48, 0x48,
0x41, 0x82, 0x4e, 0x14, 0xc4, 0x15, 0x6d, 0xb7, 0x49, 0xec, 0xe2, 0x4d, 0xdc, 0x50, 0xe4, 0x24,
0x6f, 0x93, 0xb5, 0xda, 0x0e, 0xb6, 0xa9, 0x76, 0xe4, 0x23, 0xf0, 0x31, 0xf8, 0x28, 0x1c, 0x7b,
0xec, 0x91, 0xa6, 0x12, 0xe2, 0xd8, 0x8f, 0x80, 0x9a, 0xb4, 0xa4, 0x45, 0xe2, 0xc2, 0xed, 0xbd,
0xff, 0xef, 0xaf, 0xa7, 0xf7, 0xf7, 0x33, 0x7e, 0x5b, 0xde, 0xdd, 0x9e, 0x0a, 0x75, 0x0b, 0xd6,
0x81, 0x79, 0x69, 0x6e, 0x5e, 0x9d, 0x4a, 0x70, 0xdc, 0x3a, 0x6d, 0xa0, 0xad, 0xca, 0xac, 0xad,
0x47, 0xa5, 0xd1, 0x4e, 0x93, 0xfe, 0x0e, 0x8c, 0xdf, 0xe1, 0xe3, 0xb3, 0xa2, 0x38, 0x9f, 0xe8,
0xfc, 0x8e, 0xc1, 0xa7, 0xcf, 0x60, 0x1d, 0x79, 0x81, 0x1f, 0x64, 0xeb, 0x3e, 0x44, 0x11, 0x4a,
0xfa, 0xe3, 0x87, 0xa3, 0x1d, 0xff, 0xa8, 0x76, 0x5e, 0x82, 0xe3, 0xac, 0x31, 0xc5, 0x04, 0x0f,
0xdb, 0x01, 0xb6, 0xd4, 0xca, 0x42, 0xfc, 0x13, 0x61, 0xff, 0x8f, 0x91, 0x3c, 0xc3, 0x83, 0x1b,
0x6d, 0x24, 0x77, 0xe9, 0x14, 0x8c, 0x15, 0x5a, 0xd5, 0x83, 0x0f, 0x58, 0xd0, 0xa8, 0x1f, 0x1a,
0x91, 0x0c, 0x70, 0x47, 0x14, 0x61, 0x27, 0x42, 0x89, 0xcf, 0x3a, 0xa2, 0x20, 0x8f, 0x70, 0x4f,
0x0a, 0x95, 0x3a, 0x21, 0x21, 0xec, 0x46, 0x28, 0xe9, 0xb2, 0x23, 0x29, 0xd4, 0xb5, 0x90, 0x50,
0x23, 0x7e, 0xdf, 0xa0, 0x83, 0x0d, 0xe2, 0xf7, 0x35, 0x7a, 0x8e, 0x87, 0xb9, 0x96, 0x25, 0xcf,
0x9d, 0xd0, 0x2a, 0x9d, 0xc0, 0x14, 0x26, 0xe1, 0x61, 0x84, 0x92, 0x80, 0x1d, 0xb7, 0xfa, 0xfb,
0xb5, 0x4c, 0xce, 0xf0, 0xc0, 0x81, 0xe2, 0xca, 0xa5, 0xd6, 0x19, 0xe0, 0xd2, 0x86, 0x47, 0x51,
0x37, 0xe9, 0x8f, 0x4f, 0xf6, 0x02, 0x5f, 0xd7, 0x96, 0xab, 0xc6, 0xc1, 0x02, 0xb7, 0xdb, 0xc6,
0x19, 0x0e, 0xf6, 0x38, 0x79, 0x8c, 0xfd, 0xcd, 0x4c, 0x51, 0xd4, 0x31, 0x7d, 0xd6, 0x6b, 0x84,
0x8b, 0xff, 0x4c, 0x34, 0xfe, 0x88, 0x87, 0x97, 0xdb, 0x7d, 0xae, 0xc0, 0x4c, 0x45, 0x0e, 0xe4,
0x02, 0xf7, 0xb6, 0x8f, 0x4e, 0x9e, 0xec, 0xad, 0xfb, 0xd7, 0x31, 0x4f, 0x9e, 0xfe, 0x83, 0x6e,
0x2e, 0xe5, 0x9d, 0xbf, 0x99, 0x2d, 0xa8, 0x37, 0x5f, 0x50, 0x6f, 0xb5, 0xa0, 0xe8, 0x4b, 0x45,
0xd1, 0xb7, 0x8a, 0xa2, 0xef, 0x15, 0x45, 0xb3, 0x8a, 0xa2, 0x1f, 0x15, 0x45, 0xbf, 0x2a, 0xea,
0xad, 0x2a, 0x8a, 0xbe, 0x2e, 0xa9, 0x37, 0x5b, 0x52, 0x6f, 0xbe, 0xa4, 0x5e, 0x76, 0x58, 0x7f,
0xa5, 0xd7, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x05, 0x41, 0x71, 0x83, 0x84, 0x02, 0x00, 0x00,
// 471 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcf, 0x8a, 0xd3, 0x40,
0x18, 0xcf, 0xb4, 0xb5, 0x4d, 0xa7, 0xb4, 0x5b, 0x86, 0x65, 0x89, 0x55, 0xc7, 0x10, 0x10, 0x22,
0x68, 0x8b, 0x55, 0x16, 0x3c, 0xc9, 0x16, 0x2f, 0x0b, 0xee, 0x65, 0x76, 0xf1, 0x26, 0x65, 0xda,
0x7c, 0x89, 0xc3, 0x36, 0x33, 0x31, 0x19, 0x4b, 0x8f, 0x3e, 0x82, 0x4f, 0xe0, 0xd9, 0x47, 0xd9,
0x63, 0x8f, 0x7b, 0x12, 0x9b, 0x5e, 0x3c, 0xee, 0x23, 0x48, 0x27, 0xd9, 0x6d, 0x2b, 0x78, 0xd9,
0xdb, 0xf7, 0xfb, 0x93, 0x1f, 0xf9, 0x7d, 0xf3, 0xe1, 0xe3, 0xe4, 0x32, 0x1a, 0x08, 0x19, 0x41,
0xa6, 0x21, 0x7d, 0x99, 0x86, 0xaf, 0x06, 0x31, 0x68, 0x9e, 0x69, 0x95, 0xc2, 0x76, 0x4a, 0x26,
0xdb, 0xb9, 0x9f, 0xa4, 0x4a, 0x2b, 0xd2, 0xda, 0x11, 0x7b, 0x87, 0x91, 0x8a, 0x94, 0xe1, 0x07,
0x9b, 0xa9, 0xb0, 0x78, 0xef, 0xf0, 0xc1, 0x49, 0x10, 0x8c, 0x66, 0x6a, 0x7a, 0xc9, 0xe0, 0xcb,
0x57, 0xc8, 0x34, 0x79, 0x81, 0x1f, 0x4c, 0x36, 0xd8, 0x41, 0x2e, 0xf2, 0x5b, 0xc3, 0xa3, 0xfe,
0x4e, 0x4a, 0xdf, 0x38, 0xcf, 0x40, 0x73, 0x56, 0x98, 0x3c, 0x82, 0xbb, 0xdb, 0x80, 0x2c, 0x51,
0x32, 0x03, 0xef, 0x47, 0x05, 0x37, 0xef, 0x8c, 0xe4, 0x19, 0xee, 0x84, 0x2a, 0x8d, 0xb9, 0x1e,
0xcf, 0x21, 0xcd, 0x84, 0x92, 0x26, 0xb8, 0xc6, 0xda, 0x05, 0xfb, 0xb1, 0x20, 0x49, 0x07, 0x57,
0x44, 0xe0, 0x54, 0x5c, 0xe4, 0x37, 0x59, 0x45, 0x04, 0xe4, 0x21, 0xb6, 0x63, 0x21, 0xc7, 0x5a,
0xc4, 0xe0, 0x54, 0x5d, 0xe4, 0x57, 0x59, 0x23, 0x16, 0xf2, 0x42, 0xc4, 0x60, 0x24, 0xbe, 0x28,
0xa4, 0x5a, 0x29, 0xf1, 0x85, 0x91, 0x9e, 0xe3, 0xee, 0x54, 0xc5, 0x09, 0x9f, 0x6a, 0xa1, 0xe4,
0x78, 0x06, 0x73, 0x98, 0x39, 0x75, 0x17, 0xf9, 0x6d, 0x76, 0xb0, 0xe5, 0x3f, 0x6c, 0x68, 0x72,
0x8c, 0x6d, 0x21, 0x03, 0x58, 0x30, 0x08, 0x9d, 0x86, 0xa9, 0x7a, 0xb8, 0x57, 0xf5, 0x3d, 0xd7,
0x9c, 0x41, 0x38, 0xaa, 0x5d, 0xfd, 0x7a, 0x6a, 0xb1, 0x3b, 0x2f, 0x39, 0xc1, 0x1d, 0x0d, 0x92,
0x4b, 0x3d, 0xce, 0x74, 0x0a, 0x3c, 0xce, 0x1c, 0xdb, 0xad, 0xfa, 0xad, 0x61, 0x6f, 0xef, 0xeb,
0x0b, 0x63, 0x39, 0x2f, 0x1c, 0xac, 0xad, 0x77, 0xa1, 0xf7, 0x16, 0x37, 0xca, 0x74, 0x72, 0x84,
0xeb, 0x2a, 0x0c, 0x33, 0xd0, 0x66, 0x2b, 0x55, 0x56, 0xa2, 0x0d, 0x3f, 0x03, 0x19, 0xe9, 0xcf,
0x66, 0x25, 0x55, 0x56, 0x22, 0x6f, 0x82, 0xdb, 0x7b, 0xd1, 0xe4, 0x11, 0x6e, 0x96, 0xbf, 0x23,
0x02, 0x93, 0xd1, 0x64, 0x76, 0x41, 0x9c, 0xde, 0x73, 0x89, 0xc3, 0x4f, 0xb8, 0x7b, 0x76, 0x5b,
0xe5, 0x1c, 0xd2, 0xb9, 0x98, 0x02, 0x39, 0xc5, 0xf6, 0xed, 0x3b, 0x93, 0xc7, 0x7b, 0x4d, 0xff,
0xb9, 0x9f, 0xde, 0x93, 0xff, 0xa8, 0xe5, 0x71, 0x58, 0xa3, 0x37, 0xcb, 0x15, 0xb5, 0xae, 0x57,
0xd4, 0xba, 0x59, 0x51, 0xf4, 0x2d, 0xa7, 0xe8, 0x67, 0x4e, 0xd1, 0x55, 0x4e, 0xd1, 0x32, 0xa7,
0xe8, 0x77, 0x4e, 0xd1, 0x9f, 0x9c, 0x5a, 0x37, 0x39, 0x45, 0xdf, 0xd7, 0xd4, 0x5a, 0xae, 0xa9,
0x75, 0xbd, 0xa6, 0xd6, 0xa4, 0x6e, 0x0e, 0xf6, 0xf5, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x62,
0x2d, 0xad, 0xa0, 0x0d, 0x03, 0x00, 0x00,
}
func (this *AddBlockRequest) Equal(that interface{}) bool {
@ -369,6 +435,9 @@ func (this *BlockMeta) Equal(that interface{}) bool {
if this.CompactionLevel != that1.CompactionLevel {
return false
}
if !this.IndexRef.Equal(&that1.IndexRef) {
return false
}
if len(this.TenantStreams) != len(that1.TenantStreams) {
return false
}
@ -379,6 +448,33 @@ func (this *BlockMeta) Equal(that interface{}) bool {
}
return true
}
func (this *DataRef) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*DataRef)
if !ok {
that2, ok := that.(DataRef)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Offset != that1.Offset {
return false
}
if this.Length != that1.Length {
return false
}
return true
}
func (this *TenantStreams) Equal(that interface{}) bool {
if that == nil {
return this == nil
@ -434,19 +530,31 @@ func (this *BlockMeta) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 10)
s := make([]string, 0, 11)
s = append(s, "&metastorepb.BlockMeta{")
s = append(s, "FormatVersion: "+fmt.Sprintf("%#v", this.FormatVersion)+",\n")
s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n")
s = append(s, "MinTime: "+fmt.Sprintf("%#v", this.MinTime)+",\n")
s = append(s, "MaxTime: "+fmt.Sprintf("%#v", this.MaxTime)+",\n")
s = append(s, "CompactionLevel: "+fmt.Sprintf("%#v", this.CompactionLevel)+",\n")
s = append(s, "IndexRef: "+strings.Replace(this.IndexRef.GoString(), `&`, ``, 1)+",\n")
if this.TenantStreams != nil {
s = append(s, "TenantStreams: "+fmt.Sprintf("%#v", this.TenantStreams)+",\n")
}
s = append(s, "}")
return strings.Join(s, "")
}
func (this *DataRef) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s = append(s, "&metastorepb.DataRef{")
s = append(s, "Offset: "+fmt.Sprintf("%#v", this.Offset)+",\n")
s = append(s, "Length: "+fmt.Sprintf("%#v", this.Length)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func (this *TenantStreams) GoString() string {
if this == nil {
return "nil"
@ -637,9 +745,19 @@ func (m *BlockMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i = encodeVarintMetastore(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x3a
dAtA[i] = 0x42
}
}
{
size, err := m.IndexRef.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintMetastore(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x3a
if m.CompactionLevel != 0 {
i = encodeVarintMetastore(dAtA, i, uint64(m.CompactionLevel))
i--
@ -670,6 +788,39 @@ func (m *BlockMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *DataRef) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *DataRef) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *DataRef) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Length != 0 {
i = encodeVarintMetastore(dAtA, i, uint64(m.Length))
i--
dAtA[i] = 0x10
}
if m.Offset != 0 {
i = encodeVarintMetastore(dAtA, i, uint64(m.Offset))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *TenantStreams) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -765,6 +916,8 @@ func (m *BlockMeta) Size() (n int) {
if m.CompactionLevel != 0 {
n += 1 + sovMetastore(uint64(m.CompactionLevel))
}
l = m.IndexRef.Size()
n += 1 + l + sovMetastore(uint64(l))
if len(m.TenantStreams) > 0 {
for _, e := range m.TenantStreams {
l = e.Size()
@ -774,6 +927,21 @@ func (m *BlockMeta) Size() (n int) {
return n
}
func (m *DataRef) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Offset != 0 {
n += 1 + sovMetastore(uint64(m.Offset))
}
if m.Length != 0 {
n += 1 + sovMetastore(uint64(m.Length))
}
return n
}
func (m *TenantStreams) Size() (n int) {
if m == nil {
return 0
@ -833,11 +1001,23 @@ func (this *BlockMeta) String() string {
`MinTime:` + fmt.Sprintf("%v", this.MinTime) + `,`,
`MaxTime:` + fmt.Sprintf("%v", this.MaxTime) + `,`,
`CompactionLevel:` + fmt.Sprintf("%v", this.CompactionLevel) + `,`,
`IndexRef:` + strings.Replace(strings.Replace(this.IndexRef.String(), "DataRef", "DataRef", 1), `&`, ``, 1) + `,`,
`TenantStreams:` + repeatedStringForTenantStreams + `,`,
`}`,
}, "")
return s
}
func (this *DataRef) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&DataRef{`,
`Offset:` + fmt.Sprintf("%v", this.Offset) + `,`,
`Length:` + fmt.Sprintf("%v", this.Length) + `,`,
`}`,
}, "")
return s
}
func (this *TenantStreams) String() string {
if this == nil {
return "nil"
@ -1138,6 +1318,39 @@ func (m *BlockMeta) Unmarshal(dAtA []byte) error {
}
}
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field IndexRef", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetastore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthMetastore
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthMetastore
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := m.IndexRef.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TenantStreams", wireType)
}
@ -1195,6 +1408,97 @@ func (m *BlockMeta) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *DataRef) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetastore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: DataRef: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: DataRef: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType)
}
m.Offset = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetastore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Offset |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Length", wireType)
}
m.Length = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowMetastore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Length |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipMetastore(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthMetastore
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthMetastore
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *TenantStreams) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

@ -2,6 +2,8 @@ syntax = "proto3";
package metastorepb;
import "gogoproto/gogo.proto";
service MetastoreService {
rpc AddBlock(AddBlockRequest) returns (AddBlockResponse) {}
}
@ -18,8 +20,14 @@ message BlockMeta {
int64 min_time = 3;
int64 max_time = 4;
uint32 compaction_level = 6;
DataRef indexRef = 7 [(gogoproto.nullable) = false];
repeated TenantStreams tenant_streams = 8;
}
repeated TenantStreams tenant_streams = 7;
message DataRef {
int64 offset = 1;
int64 length = 2;
}
// TenantStreams object points to the offset in the block at which

@ -24,7 +24,6 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/indexgateway"
"github.com/grafana/loki/v3/pkg/iter"
@ -43,6 +42,8 @@ import (
listutil "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/v3/pkg/util/validation"
"github.com/grafana/loki/pkg/push"
)
const (

@ -54,6 +54,7 @@ type SegmentWriter struct {
inputSize atomic.Int64
idxWriter *index.Writer
consistencyMtx *sync.RWMutex
indexRef metastorepb.DataRef
}
type streamSegment struct {
@ -209,6 +210,7 @@ func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta {
Id: id,
FormatVersion: uint64(1),
CompactionLevel: 0,
IndexRef: b.indexRef,
MinTime: globalMinT,
MaxTime: globalMaxT,
TenantStreams: result,
@ -308,6 +310,8 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
if n != len(buf) {
return total, errors.New("invalid written index len")
}
b.indexRef.Offset = total
b.indexRef.Length = int64(n)
total += int64(n)
// write index len 4b
@ -349,6 +353,8 @@ func (b *SegmentWriter) Reset() {
b.streams = make(map[streamID]*streamSegment, 64)
b.buf1.Reset()
b.inputSize.Store(0)
b.indexRef.Length = 0
b.indexRef.Offset = 0
}
// InputSize returns the total size of the input data written to the writer.

@ -16,6 +16,7 @@ import (
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/wal/index"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"
"github.com/grafana/loki/pkg/push"
@ -448,6 +449,8 @@ func TestReset(t *testing.T) {
func Test_Meta(t *testing.T) {
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
buff := bytes.NewBuffer(nil)
require.NoError(t, err)
lbls := labels.FromStrings("container", "foo", "namespace", "dev")
@ -462,7 +465,13 @@ func Test_Meta(t *testing.T) {
{Timestamp: time.Unix(3, 0), Line: "Entry 2"},
{Timestamp: time.Unix(4, 0), Line: "Entry 3"},
})
_, err = w.WriteTo(buff)
require.NoError(t, err)
meta := w.Meta("bar")
indexReader, err := index.NewReader(index.RealByteSlice(buff.Bytes()[meta.IndexRef.Offset : meta.IndexRef.Offset+meta.IndexRef.Length]))
require.NoError(t, err)
defer indexReader.Close()
require.Equal(t, &metastorepb.BlockMeta{
FormatVersion: 1,
@ -470,6 +479,7 @@ func Test_Meta(t *testing.T) {
MinTime: time.Unix(1, 0).UnixNano(),
MaxTime: time.Unix(4, 0).UnixNano(),
CompactionLevel: 0,
IndexRef: meta.IndexRef,
TenantStreams: []*metastorepb.TenantStreams{
{
TenantId: "tenanta",

Loading…
Cancel
Save