WAL Replay counter (#3974)

* speccing out unordered head block

* testware & unordered serialise

* common utils for iter & sampleIter

* more generic forEntries

* more efficient unordedHeadChunk serialise (no internal re-casting)

* roundtripping unordered head block, exit headchunk iteration early, add constant for current default chunk version

* adds head block write benchmarks for ordered & unordered writes

* fixes bench

* removes unused initializer

* gofmt

* linting

* introduces an entry counter into streams & wal records

* sets & resets entrycount on checkpoint recovery and after wal replay finishes

* pushing to streams checks counter

* reject pre-applied entries test

* removes commented test
pull/3919/head^2
Owen Diehl 5 years ago committed by GitHub
parent 78b073e022
commit 05b8537e79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      pkg/ingester/checkpoint.go
  2. 109
      pkg/ingester/checkpoint.pb.go
  3. 4
      pkg/ingester/checkpoint.proto
  4. 37
      pkg/ingester/encoding.go
  5. 103
      pkg/ingester/encoding_test.go
  6. 2
      pkg/ingester/instance.go
  7. 5
      pkg/ingester/metrics.go
  8. 13
      pkg/ingester/recovery.go
  9. 2
      pkg/ingester/recovery_test.go
  10. 31
      pkg/ingester/stream.go
  11. 43
      pkg/ingester/stream_test.go
  12. 2
      pkg/ingester/wal.go

@ -272,6 +272,7 @@ func (s *streamIterator) Next() bool {
s.current.To = stream.lastLine.ts
s.current.LastLine = stream.lastLine.content
s.current.EntryCt = stream.entryCt
return true
}

@ -144,6 +144,9 @@ type Series struct {
// Last timestamp of the last chunk.
To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"`
LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"`
// highest counter value for pushes to this stream.
// Used to skip already applied entries during WAL replay.
EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"`
}
func (m *Series) Reset() { *m = Series{} }
@ -213,6 +216,13 @@ func (m *Series) GetLastLine() string {
return ""
}
func (m *Series) GetEntryCt() int64 {
if m != nil {
return m.EntryCt
}
return 0
}
func init() {
proto.RegisterType((*Chunk)(nil), "loki_ingester.Chunk")
proto.RegisterType((*Series)(nil), "loki_ingester.Series")
@ -221,38 +231,39 @@ func init() {
func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) }
var fileDescriptor_00f4b7152db9bdb5 = []byte{
// 492 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xbd, 0x8e, 0xd3, 0x4c,
0x14, 0xf5, 0x24, 0x8e, 0x3f, 0x67, 0xf2, 0xd1, 0x0c, 0x08, 0x8d, 0x22, 0x31, 0xb1, 0xb6, 0x4a,
0x83, 0x2d, 0x05, 0x0a, 0x68, 0x90, 0x62, 0x10, 0x12, 0xd2, 0x16, 0xc8, 0x40, 0x43, 0x83, 0xfc,
0x33, 0xb1, 0x4d, 0x1c, 0x8f, 0x35, 0x33, 0x96, 0xa0, 0xe3, 0x11, 0xf6, 0x31, 0x78, 0x04, 0x1e,
0x61, 0xcb, 0x94, 0x2b, 0x90, 0x16, 0xe2, 0x34, 0x94, 0xfb, 0x08, 0x68, 0xc6, 0x36, 0x1b, 0x4a,
0x77, 0xf7, 0x9c, 0x7b, 0x8f, 0xcf, 0xf5, 0x9d, 0x03, 0x1f, 0x54, 0xdb, 0xd4, 0xcb, 0xcb, 0x94,
0x0a, 0x49, 0xb9, 0x17, 0x67, 0x34, 0xde, 0x56, 0x2c, 0x2f, 0xa5, 0x5b, 0x71, 0x26, 0x19, 0xba,
0x53, 0xb0, 0x6d, 0xfe, 0xa1, 0xef, 0xcf, 0x17, 0x29, 0x63, 0x69, 0x41, 0x3d, 0xdd, 0x8c, 0xea,
0x8d, 0x27, 0xf3, 0x1d, 0x15, 0x32, 0xdc, 0x55, 0xed, 0xfc, 0xfc, 0x61, 0x9a, 0xcb, 0xac, 0x8e,
0xdc, 0x98, 0xed, 0xbc, 0x94, 0xa5, 0xec, 0x76, 0x52, 0x21, 0x0d, 0x74, 0xd5, 0x8d, 0x3f, 0x3d,
0x19, 0x8f, 0x19, 0x97, 0xf4, 0x53, 0xc5, 0xd9, 0x47, 0x1a, 0xcb, 0x0e, 0x79, 0x6a, 0xbb, 0xae,
0x11, 0x75, 0x45, 0x2b, 0x3d, 0xfb, 0x31, 0x82, 0x93, 0xe7, 0x59, 0x5d, 0x6e, 0xd1, 0x13, 0x68,
0x6e, 0x38, 0xdb, 0x61, 0xe0, 0x80, 0xe5, 0x6c, 0x35, 0x77, 0xdb, 0x1d, 0xdd, 0xde, 0xd9, 0x7d,
0xdb, 0xef, 0xe8, 0xdb, 0x97, 0xd7, 0x0b, 0xe3, 0xe2, 0xe7, 0x02, 0x04, 0x5a, 0x81, 0x1e, 0xc3,
0x91, 0x64, 0x78, 0x34, 0x40, 0x37, 0x92, 0x0c, 0xf9, 0x70, 0xba, 0x29, 0x6a, 0x91, 0xd1, 0x64,
0x2d, 0xf1, 0x78, 0x80, 0xf8, 0x56, 0x86, 0x5e, 0xc2, 0x59, 0x11, 0x0a, 0xf9, 0xae, 0x4a, 0x42,
0x49, 0x13, 0x6c, 0x0e, 0xf8, 0xca, 0xa9, 0x10, 0xdd, 0x87, 0x56, 0x5c, 0x30, 0x41, 0x13, 0x3c,
0x71, 0xc0, 0xd2, 0x0e, 0x3a, 0xa4, 0x78, 0xf1, 0xb9, 0x8c, 0x69, 0x82, 0xad, 0x96, 0x6f, 0x11,
0x42, 0xd0, 0x4c, 0x42, 0x19, 0xe2, 0xff, 0x1c, 0xb0, 0xfc, 0x3f, 0xd0, 0xb5, 0xe2, 0x32, 0x1a,
0x26, 0xd8, 0x6e, 0x39, 0x55, 0x9f, 0x7d, 0x1b, 0x41, 0xeb, 0x0d, 0xe5, 0x39, 0x15, 0xea, 0x53,
0xb5, 0xa0, 0xfc, 0xd5, 0x0b, 0x7d, 0xe0, 0x69, 0xd0, 0x21, 0xe4, 0xc0, 0xd9, 0x46, 0x05, 0x83,
0x57, 0x3c, 0x2f, 0xa5, 0xbe, 0xa2, 0x19, 0x9c, 0x52, 0xa8, 0x84, 0x56, 0x11, 0x46, 0xb4, 0x10,
0x78, 0xec, 0x8c, 0x97, 0xb3, 0xd5, 0x5d, 0xb7, 0x7f, 0x4a, 0xf7, 0x5c, 0xf1, 0xaf, 0xc3, 0x9c,
0xfb, 0x6b, 0xf5, 0x63, 0xdf, 0xaf, 0x17, 0x83, 0xa2, 0xd0, 0xea, 0xd7, 0x49, 0x58, 0x49, 0xca,
0x83, 0xce, 0x05, 0xad, 0xa0, 0x15, 0xab, 0x44, 0x08, 0x6c, 0x6a, 0xbf, 0x7b, 0xee, 0x3f, 0xe9,
0x75, 0x75, 0x5c, 0x7c, 0x53, 0x19, 0x06, 0xdd, 0x64, 0x17, 0x81, 0xc9, 0xc0, 0x08, 0xcc, 0xa1,
0xad, 0x5e, 0xe1, 0x3c, 0x2f, 0xa9, 0x3e, 0xf0, 0x34, 0xf8, 0x8b, 0xfd, 0x67, 0xfb, 0x03, 0x31,
0xae, 0x0e, 0xc4, 0xb8, 0x39, 0x10, 0xf0, 0xa5, 0x21, 0xe0, 0x6b, 0x43, 0xc0, 0x65, 0x43, 0xc0,
0xbe, 0x21, 0xe0, 0x57, 0x43, 0xc0, 0xef, 0x86, 0x18, 0x37, 0x0d, 0x01, 0x17, 0x47, 0x62, 0xec,
0x8f, 0xc4, 0xb8, 0x3a, 0x12, 0xe3, 0xbd, 0xdd, 0x6f, 0x19, 0x59, 0xda, 0xfd, 0xd1, 0x9f, 0x00,
0x00, 0x00, 0xff, 0xff, 0xae, 0x13, 0x93, 0xc4, 0x9a, 0x03, 0x00, 0x00,
// 503 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x3d, 0x8f, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0x8e, 0x2f, 0xd9, 0x40, 0xb3, 0x20, 0xb4, 0x8a, 0xc4, 0xc6, 0xba, 0x2a, 0x0d,
0xb6, 0x14, 0x28, 0xa0, 0x41, 0x8a, 0x0f, 0x21, 0x21, 0x5d, 0x81, 0x0c, 0x34, 0x34, 0xc8, 0x1f,
0x1b, 0xdb, 0xc4, 0xf1, 0x5a, 0xbb, 0x6b, 0x89, 0xeb, 0xf8, 0x09, 0xf7, 0x33, 0xf8, 0x29, 0x57,
0x46, 0x54, 0x27, 0x90, 0x0e, 0xe2, 0x34, 0x94, 0xf7, 0x13, 0xd0, 0xae, 0x6d, 0x2e, 0x94, 0xee,
0xe6, 0xbd, 0x99, 0xe7, 0x79, 0x9e, 0x7d, 0xf0, 0x71, 0xb9, 0x49, 0xdc, 0xac, 0x48, 0xa8, 0x90,
0x94, 0xbb, 0x51, 0x4a, 0xa3, 0x4d, 0xc9, 0xb2, 0x42, 0x3a, 0x25, 0x67, 0x92, 0xa1, 0xfb, 0x39,
0xdb, 0x64, 0x9f, 0xba, 0xfe, 0x6c, 0x9e, 0x30, 0x96, 0xe4, 0xd4, 0xd5, 0xcd, 0xb0, 0x5a, 0xbb,
0x32, 0xdb, 0x52, 0x21, 0x83, 0x6d, 0xd9, 0xcc, 0xcf, 0x9e, 0x24, 0x99, 0x4c, 0xab, 0xd0, 0x89,
0xd8, 0xd6, 0x4d, 0x58, 0xc2, 0xee, 0x26, 0x15, 0xd2, 0x40, 0x57, 0xed, 0xf8, 0x8b, 0xa3, 0xf1,
0x88, 0x71, 0x49, 0xbf, 0x94, 0x9c, 0x7d, 0xa6, 0x91, 0x6c, 0x91, 0xab, 0xdc, 0xb5, 0x8d, 0xb0,
0x2d, 0x1a, 0xe9, 0xe9, 0xcf, 0x01, 0x1c, 0x9d, 0xa5, 0x55, 0xb1, 0x41, 0xcf, 0xa1, 0xb9, 0xe6,
0x6c, 0x8b, 0x81, 0x0d, 0x16, 0xd3, 0xe5, 0xcc, 0x69, 0x3c, 0x3a, 0xdd, 0x66, 0xe7, 0x7d, 0xe7,
0xd1, 0x1b, 0x5f, 0xdd, 0xcc, 0x8d, 0xcb, 0x5f, 0x73, 0xe0, 0x6b, 0x05, 0x7a, 0x06, 0x07, 0x92,
0xe1, 0x41, 0x0f, 0xdd, 0x40, 0x32, 0xe4, 0xc1, 0xc9, 0x3a, 0xaf, 0x44, 0x4a, 0xe3, 0x95, 0xc4,
0xc3, 0x1e, 0xe2, 0x3b, 0x19, 0x7a, 0x0d, 0xa7, 0x79, 0x20, 0xe4, 0x87, 0x32, 0x0e, 0x24, 0x8d,
0xb1, 0xd9, 0xe3, 0x2b, 0xc7, 0x42, 0xf4, 0x08, 0x5a, 0x51, 0xce, 0x04, 0x8d, 0xf1, 0xc8, 0x06,
0x8b, 0xb1, 0xdf, 0x22, 0xc5, 0x8b, 0x8b, 0x22, 0xa2, 0x31, 0xb6, 0x1a, 0xbe, 0x41, 0x08, 0x41,
0x33, 0x0e, 0x64, 0x80, 0x4f, 0x6c, 0xb0, 0xb8, 0xe7, 0xeb, 0x5a, 0x71, 0x29, 0x0d, 0x62, 0x3c,
0x6e, 0x38, 0x55, 0x9f, 0x7e, 0x1f, 0x40, 0xeb, 0x1d, 0xe5, 0x19, 0x15, 0xea, 0x53, 0x95, 0xa0,
0xfc, 0xcd, 0x2b, 0x7d, 0xe0, 0x89, 0xdf, 0x22, 0x64, 0xc3, 0xe9, 0x5a, 0x05, 0x83, 0x97, 0x3c,
0x2b, 0xa4, 0xbe, 0xa2, 0xe9, 0x1f, 0x53, 0xa8, 0x80, 0x56, 0x1e, 0x84, 0x34, 0x17, 0x78, 0x68,
0x0f, 0x17, 0xd3, 0xe5, 0x03, 0xa7, 0x7b, 0x4a, 0xe7, 0x5c, 0xf1, 0x6f, 0x83, 0x8c, 0x7b, 0x2b,
0xf5, 0x63, 0x3f, 0x6e, 0xe6, 0xbd, 0xa2, 0xd0, 0xe8, 0x57, 0x71, 0x50, 0x4a, 0xca, 0xfd, 0x76,
0x0b, 0x5a, 0x42, 0x2b, 0x52, 0x89, 0x10, 0xd8, 0xd4, 0xfb, 0x1e, 0x3a, 0xff, 0xa5, 0xd7, 0xd1,
0x71, 0xf1, 0x4c, 0xb5, 0xd0, 0x6f, 0x27, 0xdb, 0x08, 0x8c, 0x7a, 0x46, 0x60, 0x06, 0xc7, 0xea,
0x15, 0xce, 0xb3, 0x82, 0xea, 0x03, 0x4f, 0xfc, 0x7f, 0x18, 0x61, 0x78, 0x42, 0x0b, 0xc9, 0x2f,
0xce, 0xa4, 0xbe, 0xf2, 0xd0, 0xef, 0xa0, 0xf7, 0x72, 0xb7, 0x27, 0xc6, 0xf5, 0x9e, 0x18, 0xb7,
0x7b, 0x02, 0xbe, 0xd6, 0x04, 0x7c, 0xab, 0x09, 0xb8, 0xaa, 0x09, 0xd8, 0xd5, 0x04, 0xfc, 0xae,
0x09, 0xf8, 0x53, 0x13, 0xe3, 0xb6, 0x26, 0xe0, 0xf2, 0x40, 0x8c, 0xdd, 0x81, 0x18, 0xd7, 0x07,
0x62, 0x7c, 0x1c, 0x77, 0xfe, 0x43, 0x4b, 0xfb, 0x7a, 0xfa, 0x37, 0x00, 0x00, 0xff, 0xff, 0x70,
0x87, 0xe1, 0x9b, 0xb4, 0x03, 0x00, 0x00,
}
func (this *Chunk) Equal(that interface{}) bool {
@ -347,6 +358,9 @@ func (this *Series) Equal(that interface{}) bool {
if this.LastLine != that1.LastLine {
return false
}
if this.EntryCt != that1.EntryCt {
return false
}
return true
}
func (this *Chunk) GoString() string {
@ -370,7 +384,7 @@ func (this *Series) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 10)
s := make([]string, 0, 11)
s = append(s, "&ingester.Series{")
s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n")
s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n")
@ -384,6 +398,7 @@ func (this *Series) GoString() string {
}
s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n")
s = append(s, "LastLine: "+fmt.Sprintf("%#v", this.LastLine)+",\n")
s = append(s, "EntryCt: "+fmt.Sprintf("%#v", this.EntryCt)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -541,6 +556,11 @@ func (m *Series) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.LastLine)))
i += copy(dAtA[i:], m.LastLine)
}
if m.EntryCt != 0 {
dAtA[i] = 0x38
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(m.EntryCt))
}
return i, nil
}
@ -615,6 +635,9 @@ func (m *Series) Size() (n int) {
if l > 0 {
n += 1 + l + sovCheckpoint(uint64(l))
}
if m.EntryCt != 0 {
n += 1 + sovCheckpoint(uint64(m.EntryCt))
}
return n
}
@ -659,6 +682,7 @@ func (this *Series) String() string {
`Chunks:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Chunks), "Chunk", "Chunk", 1), `&`, ``, 1) + `,`,
`To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`LastLine:` + fmt.Sprintf("%v", this.LastLine) + `,`,
`EntryCt:` + fmt.Sprintf("%v", this.EntryCt) + `,`,
`}`,
}, "")
return s
@ -1177,6 +1201,25 @@ func (m *Series) Unmarshal(dAtA []byte) error {
}
m.LastLine = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 7:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field EntryCt", wireType)
}
m.EntryCt = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCheckpoint
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.EntryCt |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipCheckpoint(dAtA[iNdEx:])

@ -33,4 +33,8 @@ message Series {
// Last timestamp of the last chunk.
google.protobuf.Timestamp to = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string lastLine = 6;
// highest counter value for pushes to this stream.
// Used to skip already applied entries during WAL replay.
int64 entryCt = 7;
}

@ -17,12 +17,19 @@ const (
_ = iota // ignore first value so the zero value doesn't look like a record type.
// WALRecordSeries is the type for the WAL record for series.
WALRecordSeries RecordType = iota
// WALRecordSamples is the type for the WAL record for samples.
WALRecordEntries
// WALRecordEntriesV1 is the type for the WAL record for samples.
WALRecordEntriesV1
// CheckpointRecord is the type for the Checkpoint record based on protos.
CheckpointRecord
// WALRecordEntriesV2 is the type for the WAL record for samples with an
// additional counter value for use in replaying without the ordering constraint.
WALRecordEntriesV2
)
// The current type of Entries that this distribution writes.
// Loki can read in a backwards compatible manner, but will write the newest variant.
const CurrentEntriesRec RecordType = WALRecordEntriesV2
// WALRecord is a struct combining the series and samples record.
type WALRecord struct {
UserID string
@ -52,20 +59,23 @@ func (r *WALRecord) Reset() {
r.entryIndexMap = make(map[uint64]int)
}
func (r *WALRecord) AddEntries(fp uint64, entries ...logproto.Entry) {
func (r *WALRecord) AddEntries(fp uint64, counter int64, entries ...logproto.Entry) {
if idx, ok := r.entryIndexMap[fp]; ok {
r.RefEntries[idx].Entries = append(r.RefEntries[idx].Entries, entries...)
r.RefEntries[idx].Counter = counter
return
}
r.entryIndexMap[fp] = len(r.RefEntries)
r.RefEntries = append(r.RefEntries, RefEntries{
Counter: counter,
Ref: fp,
Entries: entries,
})
}
type RefEntries struct {
Counter int64
Ref uint64
Entries []logproto.Entry
}
@ -84,9 +94,9 @@ func (r *WALRecord) encodeSeries(b []byte) []byte {
return encoded
}
func (r *WALRecord) encodeEntries(b []byte) []byte {
func (r *WALRecord) encodeEntries(version RecordType, b []byte) []byte {
buf := EncWith(b)
buf.PutByte(byte(WALRecordEntries))
buf.PutByte(byte(version))
buf.PutUvarintStr(r.UserID)
// Placeholder for the first timestamp of any sample encountered.
@ -108,7 +118,12 @@ outer:
if len(ref.Entries) < 1 {
continue
}
buf.PutBE64(ref.Ref) // write fingerprint
buf.PutBE64(ref.Ref) // write fingerprint
if version >= WALRecordEntriesV2 {
buf.PutBE64int64(ref.Counter) // write highest counter value
}
buf.PutUvarint(len(ref.Entries)) // write number of entries
for _, s := range ref.Entries {
@ -120,7 +135,7 @@ outer:
return buf.Get()
}
func decodeEntries(b []byte, rec *WALRecord) error {
func decodeEntries(b []byte, version RecordType, rec *WALRecord) error {
if len(b) == 0 {
return nil
}
@ -133,6 +148,10 @@ func decodeEntries(b []byte, rec *WALRecord) error {
Ref: dec.Be64(),
}
if version >= WALRecordEntriesV2 {
refEntries.Counter = dec.Be64int64()
}
nEntries := dec.Uvarint()
refEntries.Entries = make([]logproto.Entry, 0, nEntries)
rem := nEntries
@ -178,9 +197,9 @@ func decodeWALRecord(b []byte, walRec *WALRecord) (err error) {
case WALRecordSeries:
userID = decbuf.UvarintStr()
rSeries, err = dec.Series(decbuf.B, walRec.Series)
case WALRecordEntries:
case WALRecordEntriesV1, WALRecordEntriesV2:
userID = decbuf.UvarintStr()
err = decodeEntries(decbuf.B, walRec)
err = decodeEntries(decbuf.B, t, walRec)
default:
return errors.New("unknown record type")
}

@ -51,46 +51,93 @@ func Test_Encoding_Series(t *testing.T) {
}
func Test_Encoding_Entries(t *testing.T) {
record := &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: []logproto.Entry{
for _, tc := range []struct {
desc string
rec *WALRecord
version RecordType
}{
{
desc: "v1",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
Ref: 456,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
Ref: 789,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
{
Ref: 789,
Entries: []logproto.Entry{
version: WALRecordEntriesV1,
},
{
desc: "v2",
rec: &WALRecord{
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
Ref: 456,
Counter: 1, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
Ref: 789,
Counter: 2, // v2 uses counter for WAL replay
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
},
version: WALRecordEntriesV2,
},
}
buf := record.encodeEntries(nil)
decoded := recordPool.GetRecord()
} {
decoded := recordPool.GetRecord()
buf := tc.rec.encodeEntries(tc.version, nil)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, tc.rec, decoded)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, decoded)
}
}
func Benchmark_EncodeEntries(b *testing.B) {
@ -121,7 +168,7 @@ func Benchmark_EncodeEntries(b *testing.B) {
defer recordPool.PutBytes(buf)
for n := 0; n < b.N; n++ {
record.encodeEntries(buf)
record.encodeEntries(CurrentEntriesRec, buf)
}
}
@ -148,7 +195,7 @@ func Benchmark_DecodeWAL(b *testing.B) {
},
}
buf := record.encodeEntries(nil)
buf := record.encodeEntries(CurrentEntriesRec, nil)
rec := recordPool.GetRecord()
b.ReportAllocs()
b.ResetTimer()

@ -165,7 +165,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
continue
}
if _, err := stream.Push(ctx, s.Entries, record); err != nil {
if _, err := stream.Push(ctx, s.Entries, record, 0); err != nil {
appendErr = err
continue
}

@ -22,6 +22,7 @@ type ingesterMetrics struct {
recoveredStreamsTotal prometheus.Counter
recoveredChunksTotal prometheus.Counter
recoveredEntriesTotal prometheus.Counter
duplicateEntriesTotal prometheus.Counter
recoveredBytesTotal prometheus.Counter
recoveryBytesInUse prometheus.Gauge
recoveryIsFlushing prometheus.Gauge
@ -100,6 +101,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Name: "loki_ingester_wal_recovered_entries_total",
Help: "Total number of entries recovered from the WAL.",
}),
duplicateEntriesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_duplicate_entries_total",
Help: "Entries discarded during WAL replay due to existing in checkpoints.",
}),
recoveredBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_recovered_bytes_total",
Help: "Total number of bytes recovered from the WAL.",

@ -125,6 +125,7 @@ func (r *ingesterRecoverer) Series(series *Series) error {
bytesAdded, entriesAdded, err := stream.setChunks(series.Chunks)
stream.lastLine.ts = series.To
stream.lastLine.content = series.LastLine
stream.entryCt = series.EntryCt
if err != nil {
return err
@ -190,13 +191,23 @@ func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error {
}
// ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments)
bytesAdded, _ := s.(*stream).Push(context.Background(), entries.Entries, nil)
bytesAdded, err := s.(*stream).Push(context.Background(), entries.Entries, nil, entries.Counter)
r.ing.replayController.Add(int64(bytesAdded))
if err != nil && err == ErrEntriesExist {
r.ing.metrics.duplicateEntriesTotal.Add(float64(len(entries.Entries)))
}
return nil
})
}
func (r *ingesterRecoverer) Close() {
// reset all the incrementing stream counters after a successful WAL replay.
for _, inst := range r.ing.getInstances() {
inst.forAllStreams(context.Background(), func(s *stream) error {
s.resetCounter()
return nil
})
}
close(r.done)
}

@ -90,7 +90,7 @@ func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALRea
}
if len(rec.RefEntries) > 0 {
reader.xs = append(reader.xs, rec.encodeEntries(nil))
reader.xs = append(reader.xs, rec.encodeEntries(CurrentEntriesRec, nil))
}
}

@ -10,6 +10,7 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -46,6 +47,10 @@ var (
})
)
var (
ErrEntriesExist = errors.New("duplicate push - entries already exist")
)
func init() {
prometheus.MustRegister(chunksCreatedTotal)
prometheus.MustRegister(samplesPerChunk)
@ -72,6 +77,13 @@ type stream struct {
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
// entryCt is a counter which is incremented on each accepted entry.
// This allows us to discard WAL entries during replays which were
// already recovered via checkpoints. Historically out of order
// errors were used to detect this, but this counter has been
// introduced to facilitate removing the ordering constraint.
entryCt int64
}
type chunkDesc struct {
@ -139,10 +151,22 @@ func (s *stream) NewChunk() *chunkenc.MemChunk {
func (s *stream) Push(
ctx context.Context,
entries []logproto.Entry,
// WAL record to add push contents to.
// May be nil to disable this functionality.
record *WALRecord,
// Counter used in WAL replay to avoid duplicates.
// If this is non-zero, the stream will reject entries
// with a counter value less than or equal to it's own.
// It is set to zero and thus bypassed outside of WAL replays.
counter int64,
) (int, error) {
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
if counter > 0 && counter <= s.entryCt {
return 0, ErrEntriesExist
}
var bytesAdded int
prevNumChunks := len(s.chunks)
var lastChunkTimestamp time.Time
@ -201,6 +225,7 @@ func (s *stream) Push(
lastChunkTimestamp = entries[i].Timestamp
s.lastLine.ts = lastChunkTimestamp
s.lastLine.content = entries[i].Line
s.entryCt++
// length of string plus
bytesAdded += len(entries[i].Line)
@ -211,7 +236,7 @@ func (s *stream) Push(
if len(storedEntries) != 0 {
// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
record.AddEntries(uint64(s.fp), storedEntries...)
record.AddEntries(uint64(s.fp), s.entryCt, storedEntries...)
} else {
// If record is nil, this is a WAL recovery.
s.metrics.recoveredEntriesTotal.Add(float64(len(storedEntries)))
@ -368,3 +393,7 @@ func (s *stream) addTailer(t *tailer) {
s.tailers[t.getID()] = t
}
func (s *stream) resetCounter() {
s.entryCt = 0
}

@ -46,7 +46,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
_, err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(int64(numLogs), 0), Line: "log"},
}, recordPool.GetRecord())
}, recordPool.GetRecord(), 0)
require.NoError(t, err)
newLines := make([]logproto.Entry, numLogs)
@ -65,7 +65,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs)
expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String())
_, err = s.Push(context.Background(), newLines, recordPool.GetRecord())
_, err = s.Push(context.Background(), newLines, recordPool.GetRecord(), 0)
require.Error(t, err)
require.Equal(t, expectErr.Error(), err.Error())
})
@ -86,7 +86,7 @@ func TestPushDeduplication(t *testing.T) {
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "newer, better test"},
}, recordPool.GetRecord())
}, recordPool.GetRecord(), 0)
require.NoError(t, err)
require.Len(t, s.chunks, 1)
require.Equal(t, s.chunks[0].chunk.Size(), 2,
@ -94,6 +94,41 @@ func TestPushDeduplication(t *testing.T) {
require.Equal(t, len("test"+"newer, better test"), written)
}
func TestPushRejectOldCounter(t *testing.T) {
s := newStream(
defaultConfig(),
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
NilMetrics,
)
// counter should be 2 now since the first line will be deduped
_, err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "newer, better test"},
}, recordPool.GetRecord(), 0)
require.NoError(t, err)
require.Len(t, s.chunks, 1)
require.Equal(t, s.chunks[0].chunk.Size(), 2,
"expected exact duplicate to be dropped and newer content with same timestamp to be appended")
// fail to push with a counter <= the streams internal counter
_, err = s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
}, recordPool.GetRecord(), 2)
require.Equal(t, ErrEntriesExist, err)
// succeed with a greater counter
_, err = s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
}, recordPool.GetRecord(), 3)
require.Nil(t, err)
}
func TestStreamIterator(t *testing.T) {
const chunks = 3
const entries = 100
@ -164,7 +199,7 @@ func Benchmark_PushStream(b *testing.B) {
for n := 0; n < b.N; n++ {
rec := recordPool.GetRecord()
_, err := s.Push(ctx, e, rec)
_, err := s.Push(ctx, e, rec, 0)
require.NoError(b, err)
recordPool.PutRecord(rec)
}

@ -126,7 +126,7 @@ func (w *walWrapper) Log(record *WALRecord) error {
buf = buf[:0]
}
if len(record.RefEntries) > 0 {
buf = record.encodeEntries(buf)
buf = record.encodeEntries(CurrentEntriesRec, buf)
if err := w.wal.Log(buf); err != nil {
return err
}

Loading…
Cancel
Save