prevents duplicate log lines from being replayed. closes #3378 (#3388)

pull/3344/head
Owen Diehl 5 years ago committed by GitHub
parent 74a97899dd
commit 7613197593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      pkg/ingester/checkpoint.go
  2. 177
      pkg/ingester/checkpoint.pb.go
  3. 3
      pkg/ingester/checkpoint.proto
  4. 9
      pkg/ingester/encoding_test.go
  5. 2
      pkg/ingester/recovery.go
  6. 85
      pkg/ingester/recovery_test.go

@ -269,6 +269,9 @@ func (s *streamIterator) Next() bool {
s.current.Fingerprint = uint64(stream.fp)
s.current.Labels = client.FromLabelsToLabelAdapters(stream.labels)
s.current.To = stream.lastLine.ts
s.current.LastLine = stream.lastLine.content
return true
}

@ -141,6 +141,9 @@ type Series struct {
Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"`
Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"`
// Last timestamp of the last chunk.
To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"`
LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"`
}
func (m *Series) Reset() { *m = Series{} }
@ -196,6 +199,20 @@ func (m *Series) GetChunks() []Chunk {
return nil
}
func (m *Series) GetTo() time.Time {
if m != nil {
return m.To
}
return time.Time{}
}
func (m *Series) GetLastLine() string {
if m != nil {
return m.LastLine
}
return ""
}
func init() {
proto.RegisterType((*Chunk)(nil), "loki_ingester.Chunk")
proto.RegisterType((*Series)(nil), "loki_ingester.Series")
@ -204,37 +221,38 @@ func init() {
func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) }
var fileDescriptor_00f4b7152db9bdb5 = []byte{
// 473 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xb1, 0x8e, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0x3e, 0x93, 0xdb, 0x40, 0xc1, 0x0a, 0xa1, 0x55, 0x24, 0x36, 0xd6, 0x55, 0x69,
0xb0, 0xa5, 0x40, 0x41, 0x85, 0x38, 0x83, 0x10, 0x48, 0x14, 0xc8, 0x40, 0x43, 0x83, 0x1c, 0x7b,
0x62, 0x9b, 0x38, 0x5e, 0x6b, 0x77, 0x2d, 0x41, 0xc7, 0x27, 0xdc, 0x67, 0xf0, 0x29, 0x57, 0xa6,
0x3c, 0x81, 0x74, 0x10, 0xa7, 0x41, 0xa2, 0xb9, 0x4f, 0x40, 0xbb, 0xb6, 0x75, 0xb9, 0x32, 0x74,
0xf3, 0xde, 0xcc, 0x7b, 0x33, 0xbb, 0x33, 0xf8, 0x41, 0xb5, 0x4a, 0xfd, 0xbc, 0x4c, 0x41, 0x2a,
0x10, 0x7e, 0x9c, 0x41, 0xbc, 0xaa, 0x78, 0x5e, 0x2a, 0xaf, 0x12, 0x5c, 0x71, 0x72, 0xa7, 0xe0,
0xab, 0xfc, 0x53, 0x9f, 0x9f, 0x4c, 0x53, 0xce, 0xd3, 0x02, 0x7c, 0x93, 0x5c, 0xd4, 0x4b, 0x5f,
0xe5, 0x6b, 0x90, 0x2a, 0x5a, 0x57, 0x6d, 0xfd, 0xe4, 0x61, 0x9a, 0xab, 0xac, 0x5e, 0x78, 0x31,
0x5f, 0xfb, 0x29, 0x4f, 0xf9, 0x75, 0xa5, 0x46, 0x06, 0x98, 0xa8, 0x2b, 0x7f, 0xb6, 0x57, 0x1e,
0x73, 0xa1, 0xe0, 0x4b, 0x25, 0xf8, 0x67, 0x88, 0x55, 0x87, 0xfc, 0x9b, 0xd3, 0x15, 0x39, 0x94,
0x7d, 0xaa, 0x75, 0x38, 0xf9, 0x39, 0xc0, 0x47, 0xcf, 0xb3, 0xba, 0x5c, 0x91, 0x27, 0xd8, 0x5e,
0x0a, 0xbe, 0xa6, 0xc8, 0x45, 0xb3, 0xf1, 0x7c, 0xe2, 0xb5, 0xa3, 0x7a, 0xfd, 0x00, 0xde, 0xfb,
0x7e, 0xd4, 0x60, 0x74, 0x7e, 0x39, 0xb5, 0xce, 0x7e, 0x4d, 0x51, 0x68, 0x14, 0xe4, 0x31, 0x1e,
0x28, 0x4e, 0x07, 0x07, 0xe8, 0x06, 0x8a, 0x93, 0x00, 0x1f, 0x2f, 0x8b, 0x5a, 0x66, 0x90, 0x9c,
0x2a, 0x3a, 0x3c, 0x40, 0x7c, 0x2d, 0x23, 0x2f, 0xf1, 0xb8, 0x88, 0xa4, 0xfa, 0x50, 0x25, 0x91,
0x82, 0x84, 0xda, 0x07, 0xb8, 0xec, 0x0b, 0xc9, 0x7d, 0xec, 0xc4, 0x05, 0x97, 0x90, 0xd0, 0x23,
0x17, 0xcd, 0x46, 0x61, 0x87, 0x34, 0x2f, 0xbf, 0x96, 0x31, 0x24, 0xd4, 0x69, 0xf9, 0x16, 0x11,
0x82, 0xed, 0x24, 0x52, 0x11, 0xbd, 0xe5, 0xa2, 0xd9, 0xed, 0xd0, 0xc4, 0x9a, 0xcb, 0x20, 0x4a,
0xe8, 0xa8, 0xe5, 0x74, 0x7c, 0xf2, 0x17, 0x61, 0xe7, 0x1d, 0x88, 0x1c, 0xa4, 0xb6, 0xaa, 0x25,
0x88, 0xd7, 0x2f, 0xcc, 0x07, 0x1f, 0x87, 0x1d, 0x22, 0x2e, 0x1e, 0x2f, 0xf5, 0x86, 0x44, 0x25,
0xf2, 0x52, 0x99, 0x5f, 0xb4, 0xc3, 0x7d, 0x8a, 0x48, 0xec, 0x14, 0xd1, 0x02, 0x0a, 0x49, 0x87,
0xee, 0x70, 0x36, 0x9e, 0xdf, 0xf5, 0xba, 0x0d, 0xbe, 0xd1, 0xec, 0xdb, 0x28, 0x17, 0xc1, 0x2b,
0xfd, 0xac, 0x1f, 0x97, 0xd3, 0xff, 0xb9, 0x87, 0xd6, 0xe6, 0x34, 0x89, 0x2a, 0x05, 0x22, 0xec,
0x5a, 0x91, 0x39, 0x76, 0x62, 0x7d, 0x16, 0x92, 0xda, 0xa6, 0xe9, 0x3d, 0xef, 0xc6, 0x25, 0x7b,
0xe6, 0x66, 0x02, 0x5b, 0xf7, 0x0d, 0xbb, 0xca, 0xe0, 0xe9, 0x66, 0xcb, 0xac, 0x8b, 0x2d, 0xb3,
0xae, 0xb6, 0x0c, 0x7d, 0x6b, 0x18, 0xfa, 0xde, 0x30, 0x74, 0xde, 0x30, 0xb4, 0x69, 0x18, 0xfa,
0xdd, 0x30, 0xf4, 0xa7, 0x61, 0xd6, 0x55, 0xc3, 0xd0, 0xd9, 0x8e, 0x59, 0x9b, 0x1d, 0xb3, 0x2e,
0x76, 0xcc, 0xfa, 0x38, 0xea, 0x3d, 0x17, 0x8e, 0x59, 0xd8, 0xa3, 0x7f, 0x01, 0x00, 0x00, 0xff,
0xff, 0xba, 0x57, 0x6a, 0x42, 0x54, 0x03, 0x00, 0x00,
// 491 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xbf, 0x8e, 0xd3, 0x30,
0x1c, 0x8e, 0x7b, 0x69, 0x48, 0x5d, 0x18, 0xb0, 0x10, 0xb2, 0x2a, 0xe1, 0x46, 0x37, 0x75, 0x21,
0x91, 0x0a, 0x03, 0x13, 0xe2, 0x02, 0x42, 0x20, 0xdd, 0x80, 0x02, 0x2c, 0x2c, 0x28, 0x4d, 0xdc,
0xc4, 0x34, 0x8d, 0x23, 0xdb, 0x91, 0x60, 0xe3, 0x11, 0xee, 0x31, 0x78, 0x09, 0xf6, 0x1b, 0x3b,
0x9e, 0x40, 0x3a, 0x68, 0xba, 0x30, 0xde, 0x23, 0x20, 0x3b, 0x09, 0xd7, 0x1b, 0xcb, 0xe6, 0xef,
0xfb, 0xfd, 0xfb, 0xfc, 0xfb, 0x7d, 0xf0, 0x41, 0xb5, 0xca, 0x02, 0x56, 0x66, 0x54, 0x2a, 0x2a,
0x82, 0x24, 0xa7, 0xc9, 0xaa, 0xe2, 0xac, 0x54, 0x7e, 0x25, 0xb8, 0xe2, 0xe8, 0x4e, 0xc1, 0x57,
0xec, 0x63, 0x1f, 0x9f, 0x4c, 0x33, 0xce, 0xb3, 0x82, 0x06, 0x26, 0xb8, 0xa8, 0x97, 0x81, 0x62,
0x6b, 0x2a, 0x55, 0xbc, 0xae, 0xda, 0xfc, 0xc9, 0xc3, 0x8c, 0xa9, 0xbc, 0x5e, 0xf8, 0x09, 0x5f,
0x07, 0x19, 0xcf, 0xf8, 0x75, 0xa6, 0x46, 0x06, 0x98, 0x57, 0x97, 0xfe, 0x6c, 0x2f, 0x3d, 0xe1,
0x42, 0xd1, 0xcf, 0x95, 0xe0, 0x9f, 0x68, 0xa2, 0x3a, 0x14, 0xdc, 0x54, 0x57, 0x30, 0x5a, 0xf6,
0xa1, 0xb6, 0xc3, 0xf1, 0xcf, 0x01, 0x1c, 0x3e, 0xcf, 0xeb, 0x72, 0x85, 0x9e, 0x40, 0x7b, 0x29,
0xf8, 0x1a, 0x03, 0x0f, 0xcc, 0xc6, 0xf3, 0x89, 0xdf, 0x4a, 0xf5, 0x7b, 0x01, 0xfe, 0xbb, 0x5e,
0x6a, 0xe8, 0x9e, 0x5f, 0x4e, 0xad, 0xb3, 0x5f, 0x53, 0x10, 0x99, 0x0a, 0xf4, 0x18, 0x0e, 0x14,
0xc7, 0x83, 0x03, 0xea, 0x06, 0x8a, 0xa3, 0x10, 0x8e, 0x96, 0x45, 0x2d, 0x73, 0x9a, 0x9e, 0x28,
0x7c, 0x74, 0x40, 0xf1, 0x75, 0x19, 0x7a, 0x09, 0xc7, 0x45, 0x2c, 0xd5, 0xfb, 0x2a, 0x8d, 0x15,
0x4d, 0xb1, 0x7d, 0x40, 0x97, 0xfd, 0x42, 0x74, 0x1f, 0x3a, 0x49, 0xc1, 0x25, 0x4d, 0xf1, 0xd0,
0x03, 0x33, 0x37, 0xea, 0x90, 0xe6, 0xe5, 0x97, 0x32, 0xa1, 0x29, 0x76, 0x5a, 0xbe, 0x45, 0x08,
0x41, 0x3b, 0x8d, 0x55, 0x8c, 0x6f, 0x79, 0x60, 0x76, 0x3b, 0x32, 0x6f, 0xcd, 0xe5, 0x34, 0x4e,
0xb1, 0xdb, 0x72, 0xfa, 0x7d, 0xfc, 0x7d, 0x00, 0x9d, 0xb7, 0x54, 0x30, 0x2a, 0x75, 0xab, 0x5a,
0x52, 0xf1, 0xfa, 0x85, 0x59, 0xf0, 0x28, 0xea, 0x10, 0xf2, 0xe0, 0x78, 0xa9, 0x2f, 0x24, 0x2a,
0xc1, 0x4a, 0x65, 0xb6, 0x68, 0x47, 0xfb, 0x14, 0x92, 0xd0, 0x29, 0xe2, 0x05, 0x2d, 0x24, 0x3e,
0xf2, 0x8e, 0x66, 0xe3, 0xf9, 0x5d, 0xbf, 0xbb, 0xe0, 0xa9, 0x66, 0xdf, 0xc4, 0x4c, 0x84, 0xaf,
0xf4, 0xb7, 0x7e, 0x5c, 0x4e, 0xff, 0xc7, 0x0f, 0x6d, 0x9b, 0x93, 0x34, 0xae, 0x14, 0x15, 0x51,
0x37, 0x0a, 0xcd, 0xa1, 0x93, 0x68, 0x5b, 0x48, 0x6c, 0x9b, 0xa1, 0xf7, 0xfc, 0x1b, 0x4e, 0xf6,
0x8d, 0x67, 0x42, 0x5b, 0xcf, 0x8d, 0xba, 0xcc, 0xce, 0x07, 0xc3, 0x03, 0x7d, 0x30, 0x81, 0xae,
0x3e, 0xc5, 0x29, 0x2b, 0xa9, 0xd9, 0xf2, 0x28, 0xfa, 0x87, 0xc3, 0xa7, 0x9b, 0x2d, 0xb1, 0x2e,
0xb6, 0xc4, 0xba, 0xda, 0x12, 0xf0, 0xb5, 0x21, 0xe0, 0x5b, 0x43, 0xc0, 0x79, 0x43, 0xc0, 0xa6,
0x21, 0xe0, 0x77, 0x43, 0xc0, 0x9f, 0x86, 0x58, 0x57, 0x0d, 0x01, 0x67, 0x3b, 0x62, 0x6d, 0x76,
0xc4, 0xba, 0xd8, 0x11, 0xeb, 0x83, 0xdb, 0xab, 0x5c, 0x38, 0x66, 0xfa, 0xa3, 0xbf, 0x01, 0x00,
0x00, 0xff, 0xff, 0x50, 0x7b, 0x2f, 0x61, 0xa6, 0x03, 0x00, 0x00,
}
func (this *Chunk) Equal(that interface{}) bool {
@ -323,6 +341,12 @@ func (this *Series) Equal(that interface{}) bool {
return false
}
}
if !this.To.Equal(that1.To) {
return false
}
if this.LastLine != that1.LastLine {
return false
}
return true
}
func (this *Chunk) GoString() string {
@ -346,7 +370,7 @@ func (this *Series) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 8)
s := make([]string, 0, 10)
s = append(s, "&ingester.Series{")
s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n")
s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n")
@ -358,6 +382,8 @@ func (this *Series) GoString() string {
}
s = append(s, "Chunks: "+fmt.Sprintf("%#v", vs)+",\n")
}
s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n")
s = append(s, "LastLine: "+fmt.Sprintf("%#v", this.LastLine)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -501,6 +527,20 @@ func (m *Series) MarshalTo(dAtA []byte) (int, error) {
i += n
}
}
dAtA[i] = 0x2a
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.To)))
n5, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.To, dAtA[i:])
if err != nil {
return 0, err
}
i += n5
if len(m.LastLine) > 0 {
dAtA[i] = 0x32
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.LastLine)))
i += copy(dAtA[i:], m.LastLine)
}
return i, nil
}
@ -569,6 +609,12 @@ func (m *Series) Size() (n int) {
n += 1 + l + sovCheckpoint(uint64(l))
}
}
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.To)
n += 1 + l + sovCheckpoint(uint64(l))
l = len(m.LastLine)
if l > 0 {
n += 1 + l + sovCheckpoint(uint64(l))
}
return n
}
@ -611,6 +657,8 @@ func (this *Series) String() string {
`Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`,
`Labels:` + fmt.Sprintf("%v", this.Labels) + `,`,
`Chunks:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Chunks), "Chunk", "Chunk", 1), `&`, ``, 1) + `,`,
`To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`LastLine:` + fmt.Sprintf("%v", this.LastLine) + `,`,
`}`,
}, "")
return s
@ -1064,6 +1112,71 @@ func (m *Series) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field To", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCheckpoint
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthCheckpoint
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthCheckpoint
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.To, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LastLine", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCheckpoint
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCheckpoint
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthCheckpoint
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.LastLine = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCheckpoint(dAtA[iNdEx:])

@ -30,4 +30,7 @@ message Series {
uint64 fingerprint = 2;
repeated cortex.LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"];
repeated Chunk chunks = 4 [(gogoproto.nullable) = false];
// Last timestamp of the last chunk.
google.protobuf.Timestamp to = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string lastLine = 6;
}

@ -243,6 +243,8 @@ func Test_EncodingCheckpoint(t *testing.T) {
UserID: "fake",
Fingerprint: 123,
Labels: client.FromLabelsToLabelAdapters(ls),
To: time.Unix(10, 0),
LastLine: "lastLine",
Chunks: []Chunk{
{
From: from,
@ -275,12 +277,17 @@ func Test_EncodingCheckpoint(t *testing.T) {
outChunks := out.Chunks
out.Chunks = nil
zero := time.Unix(0, 0)
require.Equal(t, true, s.To.Equal(out.To))
s.To = zero
out.To = zero
require.Equal(t, s, out)
require.Equal(t, len(sChunks), len(outChunks))
for i, exp := range sChunks {
got := outChunks[i]
zero := time.Unix(0, 0)
// Issues diffing zero-value time.Locations against nil ones.
// Check/override them individually so that other fields get tested in an extensible manner.
require.Equal(t, true, exp.From.Equal(got.From))

@ -123,6 +123,8 @@ func (r *ingesterRecoverer) Series(series *Series) error {
}
bytesAdded, entriesAdded, err := stream.setChunks(series.Chunks)
stream.lastLine.ts = series.To
stream.lastLine.content = series.LastLine
if err != nil {
return err

@ -1,18 +1,23 @@
package ingester
import (
"context"
fmt "fmt"
"runtime"
"sync"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)
type MemoryWALReader struct {
@ -190,3 +195,83 @@ func Test_InMemorySegmentRecover(t *testing.T) {
}
}
func TestSeriesRecoveryNoDuplicates(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
store := &mockStore{
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, nil)
require.NoError(t, err)
mkSample := func(i int) *logproto.PushRequest {
return &logproto.PushRequest{
Streams: []logproto.Stream{
{
// Note: must use normalized labels here b/c we expect them
// sorted but use a string for efficiency.
Labels: `{bar="baz1", foo="bar"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("line %d", i),
},
},
},
},
}
}
req := mkSample(1)
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, req)
require.NoError(t, err)
iter := newIngesterSeriesIter(i).Iter()
require.Equal(t, true, iter.Next())
series := iter.Stream()
require.Equal(t, false, iter.Next())
// create a new ingester now
i, err = New(ingesterConfig, client.Config{}, store, limits, nil)
require.NoError(t, err)
// recover the checkpointed series
recoverer := newIngesterRecoverer(i)
require.NoError(t, recoverer.Series(series))
_, err = i.Push(ctx, req)
require.NoError(t, err) // we don't error on duplicate pushes
result := mockQuerierServer{
ctx: ctx,
}
// ensure no duplicate log lines exist
err = i.Query(&logproto.QueryRequest{
Selector: `{foo="bar",bar="baz1"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(10, 0),
}, &result)
require.NoError(t, err)
require.Len(t, result.resps, 1)
expected := []logproto.Stream{
{
Labels: `{bar="baz1", foo="bar"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "line 1",
},
},
},
}
require.Equal(t, expected, result.resps[0].Streams)
}

Loading…
Cancel
Save