diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 825bcd688e..9b62ea953e 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -54,6 +54,8 @@ type partitionProcessor struct { wg sync.WaitGroup reg prometheus.Registerer logger log.Logger + + eventsProducerClient *kgo.Client } func newPartitionProcessor( @@ -70,6 +72,7 @@ func newPartitionProcessor( reg prometheus.Registerer, bufPool *sync.Pool, idleFlushTimeout time.Duration, + eventsProducerClient *kgo.Client, ) *partitionProcessor { ctx, cancel := context.WithCancel(ctx) decoder, err := kafka.NewDecoder() @@ -99,25 +102,26 @@ func newPartitionProcessor( } return &partitionProcessor{ - client: client, - logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID), - topic: topic, - partition: partition, - records: make(chan *kgo.Record, 1000), - ctx: ctx, - cancel: cancel, - decoder: decoder, - reg: reg, - builderCfg: builderCfg, - bucket: bucket, - tenantID: []byte(tenantID), - metrics: metrics, - uploader: uploader, - metastoreUpdater: metastoreUpdater, - bufPool: bufPool, - idleFlushTimeout: idleFlushTimeout, - lastFlush: time.Now(), - lastModified: time.Now(), + client: client, + logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID), + topic: topic, + partition: partition, + records: make(chan *kgo.Record, 1000), + ctx: ctx, + cancel: cancel, + decoder: decoder, + reg: reg, + builderCfg: builderCfg, + bucket: bucket, + tenantID: []byte(tenantID), + metrics: metrics, + uploader: uploader, + metastoreUpdater: metastoreUpdater, + bufPool: bufPool, + idleFlushTimeout: idleFlushTimeout, + lastFlush: time.Now(), + lastModified: time.Now(), + eventsProducerClient: eventsProducerClient, } } @@ -207,11 +211,46 @@ func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error { return err } + if err := p.emitObjectWrittenEvent(objectPath); err != nil { + level.Error(p.logger).Log("msg", "failed to emit event", "err", err) + return err + } + p.lastFlush = time.Now() return nil } +func (p *partitionProcessor) emitObjectWrittenEvent(objectPath string) error { + if p.eventsProducerClient == nil { + return nil + } + + event := &metastore.ObjectWrittenEvent{ + Tenant: string(p.tenantID), + ObjectPath: objectPath, + WriteTime: time.Now().Format(time.RFC3339), + } + + eventBytes, err := event.Marshal() + if err != nil { + level.Error(p.logger).Log("msg", "failed to marshal metastore event", "err", err) + return err + } + + // Emitting the event is non-critical so we don't need to wait for it. + // We can just log the error and move on. + p.eventsProducerClient.Produce(p.ctx, &kgo.Record{ + Value: eventBytes, + }, func(_ *kgo.Record, err error) { + if err != nil { + level.Error(p.logger).Log("msg", "failed to produce metastore event", "err", err) + } + }) + + return nil +} + func (p *partitionProcessor) processRecord(record *kgo.Record) { // Update offset metric at the end of processing defer p.metrics.updateOffset(record.Offset) diff --git a/pkg/dataobj/consumer/partition_processor_test.go b/pkg/dataobj/consumer/partition_processor_test.go index cea2f2a49e..1a6bf3d2ed 100644 --- a/pkg/dataobj/consumer/partition_processor_test.go +++ b/pkg/dataobj/consumer/partition_processor_test.go @@ -166,6 +166,7 @@ func TestIdleFlush(t *testing.T) { prometheus.NewRegistry(), bufPool, tc.idleTimeout, + nil, ) if tc.initBuilder { @@ -235,6 +236,7 @@ func TestIdleFlushWithActiveProcessing(t *testing.T) { prometheus.NewRegistry(), bufPool, 200*time.Millisecond, + nil, ) require.NoError(t, p.initBuilder()) @@ -296,6 +298,7 @@ func TestIdleFlushWithEmptyData(t *testing.T) { prometheus.NewRegistry(), bufPool, 200*time.Millisecond, + nil, ) require.NoError(t, p.initBuilder()) diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index efc399d8bd..3201ee46c4 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/kafka/partitionring/consumer" ) @@ -32,6 +33,8 @@ type Service struct { reg prometheus.Registerer client *consumer.Client + eventsProducerClient *kgo.Client + cfg Config bucket objstore.Bucket codec distributor.TenantPrefixCodec @@ -58,7 +61,7 @@ func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore. }, } - client, err := consumer.NewGroupClient( + consumerClient, err := consumer.NewGroupClient( kafkaCfg, partitionRing, groupName, @@ -76,7 +79,17 @@ func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore. level.Error(logger).Log("msg", "failed to create consumer", "err", err) return nil } - s.client = client + + eventsKafkaCfg := kafkaCfg + eventsKafkaCfg.Topic = "loki.metastore-events" + eventsProducerClient, err := client.NewWriterClient("loki.metastore-events", eventsKafkaCfg, 50, logger, reg) + if err != nil { + level.Error(logger).Log("msg", "failed to create producer", "err", err) + return nil + } + + s.client = consumerClient + s.eventsProducerClient = eventsProducerClient s.Service = services.NewBasicService(nil, s.run, s.stopping) return s } @@ -99,7 +112,7 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie } for _, partition := range parts { - processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool, s.cfg.IdleFlushTimeout) + processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool, s.cfg.IdleFlushTimeout, s.eventsProducerClient) s.partitionHandlers[topic][partition] = processor processor.start() } diff --git a/pkg/dataobj/metastore/metastore.pb.go b/pkg/dataobj/metastore/metastore.pb.go new file mode 100644 index 0000000000..acf4f0fbfc --- /dev/null +++ b/pkg/dataobj/metastore/metastore.pb.go @@ -0,0 +1,524 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/dataobj/metastore/metastore.proto + +package metastore + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// ColumnInfo describes an individual column within a data set. +type ObjectWrittenEvent struct { + Tenant string `protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"` + ObjectPath string `protobuf:"bytes,2,opt,name=objectPath,proto3" json:"objectPath,omitempty"` + WriteTime string `protobuf:"bytes,3,opt,name=writeTime,proto3" json:"writeTime,omitempty"` +} + +func (m *ObjectWrittenEvent) Reset() { *m = ObjectWrittenEvent{} } +func (*ObjectWrittenEvent) ProtoMessage() {} +func (*ObjectWrittenEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_fdfd617758a99d3c, []int{0} +} +func (m *ObjectWrittenEvent) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ObjectWrittenEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ObjectWrittenEvent.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ObjectWrittenEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_ObjectWrittenEvent.Merge(m, src) +} +func (m *ObjectWrittenEvent) XXX_Size() int { + return m.Size() +} +func (m *ObjectWrittenEvent) XXX_DiscardUnknown() { + xxx_messageInfo_ObjectWrittenEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_ObjectWrittenEvent proto.InternalMessageInfo + +func (m *ObjectWrittenEvent) GetTenant() string { + if m != nil { + return m.Tenant + } + return "" +} + +func (m *ObjectWrittenEvent) GetObjectPath() string { + if m != nil { + return m.ObjectPath + } + return "" +} + +func (m *ObjectWrittenEvent) GetWriteTime() string { + if m != nil { + return m.WriteTime + } + return "" +} + +func init() { + proto.RegisterType((*ObjectWrittenEvent)(nil), "dataobj.metastore.ObjectWrittenEvent") +} + +func init() { + proto.RegisterFile("pkg/dataobj/metastore/metastore.proto", fileDescriptor_fdfd617758a99d3c) +} + +var fileDescriptor_fdfd617758a99d3c = []byte{ + // 232 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x2d, 0xc8, 0x4e, 0xd7, + 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca, 0xd2, 0xcf, 0x4d, 0x2d, 0x49, 0x2c, 0x2e, 0xc9, 0x2f, + 0x4a, 0x45, 0xb0, 0xf4, 0x0a, 0x8a, 0xf2, 0x4b, 0xf2, 0x85, 0x04, 0xa1, 0x4a, 0xf4, 0xe0, 0x12, + 0x4a, 0x59, 0x5c, 0x42, 0xfe, 0x49, 0x59, 0xa9, 0xc9, 0x25, 0xe1, 0x45, 0x99, 0x25, 0x25, 0xa9, + 0x79, 0xae, 0x65, 0xa9, 0x79, 0x25, 0x42, 0x62, 0x5c, 0x6c, 0x25, 0xa9, 0x79, 0x89, 0x79, 0x25, + 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x50, 0x9e, 0x90, 0x1c, 0x17, 0x57, 0x3e, 0x58, 0x75, + 0x40, 0x62, 0x49, 0x86, 0x04, 0x13, 0x58, 0x0e, 0x49, 0x44, 0x48, 0x86, 0x8b, 0xb3, 0xbc, 0x28, + 0xb3, 0x24, 0x35, 0x24, 0x33, 0x37, 0x55, 0x82, 0x19, 0x2c, 0x8d, 0x10, 0x70, 0x4a, 0xbb, 0xf0, + 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, 0x31, 0xae, + 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, + 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb, + 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x65, 0x90, 0x9e, 0x59, 0x92, 0x51, 0x9a, 0xa4, 0x97, 0x9c, + 0x9f, 0xab, 0x9f, 0x5e, 0x94, 0x98, 0x96, 0x98, 0x97, 0xa8, 0x9f, 0x93, 0x9f, 0x9d, 0xa9, 0x5f, + 0x66, 0xac, 0x8f, 0xd5, 0xdb, 0x49, 0x6c, 0x60, 0xdf, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, + 0xd3, 0x5f, 0x17, 0xa4, 0x16, 0x01, 0x00, 0x00, +} + +func (this *ObjectWrittenEvent) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ObjectWrittenEvent) + if !ok { + that2, ok := that.(ObjectWrittenEvent) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Tenant != that1.Tenant { + return false + } + if this.ObjectPath != that1.ObjectPath { + return false + } + if this.WriteTime != that1.WriteTime { + return false + } + return true +} +func (this *ObjectWrittenEvent) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&metastore.ObjectWrittenEvent{") + s = append(s, "Tenant: "+fmt.Sprintf("%#v", this.Tenant)+",\n") + s = append(s, "ObjectPath: "+fmt.Sprintf("%#v", this.ObjectPath)+",\n") + s = append(s, "WriteTime: "+fmt.Sprintf("%#v", this.WriteTime)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringMetastore(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *ObjectWrittenEvent) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ObjectWrittenEvent) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ObjectWrittenEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.WriteTime) > 0 { + i -= len(m.WriteTime) + copy(dAtA[i:], m.WriteTime) + i = encodeVarintMetastore(dAtA, i, uint64(len(m.WriteTime))) + i-- + dAtA[i] = 0x1a + } + if len(m.ObjectPath) > 0 { + i -= len(m.ObjectPath) + copy(dAtA[i:], m.ObjectPath) + i = encodeVarintMetastore(dAtA, i, uint64(len(m.ObjectPath))) + i-- + dAtA[i] = 0x12 + } + if len(m.Tenant) > 0 { + i -= len(m.Tenant) + copy(dAtA[i:], m.Tenant) + i = encodeVarintMetastore(dAtA, i, uint64(len(m.Tenant))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintMetastore(dAtA []byte, offset int, v uint64) int { + offset -= sovMetastore(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *ObjectWrittenEvent) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Tenant) + if l > 0 { + n += 1 + l + sovMetastore(uint64(l)) + } + l = len(m.ObjectPath) + if l > 0 { + n += 1 + l + sovMetastore(uint64(l)) + } + l = len(m.WriteTime) + if l > 0 { + n += 1 + l + sovMetastore(uint64(l)) + } + return n +} + +func sovMetastore(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozMetastore(x uint64) (n int) { + return sovMetastore(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *ObjectWrittenEvent) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ObjectWrittenEvent{`, + `Tenant:` + fmt.Sprintf("%v", this.Tenant) + `,`, + `ObjectPath:` + fmt.Sprintf("%v", this.ObjectPath) + `,`, + `WriteTime:` + fmt.Sprintf("%v", this.WriteTime) + `,`, + `}`, + }, "") + return s +} +func valueToStringMetastore(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *ObjectWrittenEvent) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ObjectWrittenEvent: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ObjectWrittenEvent: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetastore + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetastore + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Tenant = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ObjectPath", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetastore + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetastore + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ObjectPath = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field WriteTime", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMetastore + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMetastore + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.WriteTime = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMetastore(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMetastore + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMetastore + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipMetastore(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMetastore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMetastore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMetastore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthMetastore + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthMetastore + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMetastore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipMetastore(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthMetastore + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthMetastore = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowMetastore = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/dataobj/metastore/metastore.proto b/pkg/dataobj/metastore/metastore.proto new file mode 100644 index 0000000000..d94c3b36a5 --- /dev/null +++ b/pkg/dataobj/metastore/metastore.proto @@ -0,0 +1,13 @@ +// metastore.proto holds types for describing metastore events +syntax = "proto3"; + +package dataobj.metastore; + +option go_package = "github.com/grafana/loki/v3/pkg/dataobj/metastore"; + +// ColumnInfo describes an individual column within a data set. +message ObjectWrittenEvent { + string tenant = 1; + string objectPath = 2; + string writeTime = 3; +}