diff --git a/CHANGELOG.md b/CHANGELOG.md index 86ffa026bd..46c7aaf7e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,8 @@ ##### Changes +* [8315](https://github.com/grafana/loki/pull/8315) **thepalbi** Relicense and export `pkg/ingester` WAL code to be used in Promtail's WAL. + #### Promtail ##### Enhancements diff --git a/LICENSING.md b/LICENSING.md index 50371c8581..91ea334a41 100644 --- a/LICENSING.md +++ b/LICENSING.md @@ -10,6 +10,7 @@ The following folders and their subfolders are licensed under Apache-2.0: ``` clients/ +pkg/ingester/wal pkg/logproto/ pkg/loghttp/ pkg/logqlmodel/ diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index af66046333..21ee49c584 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -21,6 +21,7 @@ import ( prompool "github.com/prometheus/prometheus/util/pool" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/logproto" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/pool" @@ -124,15 +125,15 @@ func decodeCheckpointRecord(rec []byte, s *Series) error { cpy := make([]byte, len(rec)) copy(cpy, rec) - switch RecordType(cpy[0]) { - case CheckpointRecord: + switch wal.RecordType(cpy[0]) { + case wal.CheckpointRecord: return proto.Unmarshal(cpy[1:], s) default: return errors.Errorf("unexpected record type: %d", rec[0]) } } -func encodeWithTypeHeader(m *Series, typ RecordType, buf []byte) ([]byte, error) { +func encodeWithTypeHeader(m *Series, typ wal.RecordType, buf []byte) ([]byte, error) { size := m.Size() if cap(buf) < size+1 { buf = make([]byte, size+1) @@ -370,7 +371,7 @@ func (w *WALCheckpointWriter) Write(s *Series) error { size := s.Size() + 1 // +1 for header buf := recordBufferPool.Get(size).([]byte)[:size] - b, err := encodeWithTypeHeader(s, CheckpointRecord, buf) + b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, buf) if err != nil { return err } diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 115fcea23b..0e0a92d09e 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -6,204 +6,13 @@ import ( "time" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/tsdb/record" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/logproto" ) -func Test_Encoding_Series(t *testing.T) { - record := &WALRecord{ - entryIndexMap: make(map[uint64]int), - UserID: "123", - Series: []record.RefSeries{ - { - Ref: 456, - Labels: labels.FromMap(map[string]string{ - "foo": "bar", - "bazz": "buzz", - }), - }, - { - Ref: 789, - Labels: labels.FromMap(map[string]string{ - "abc": "123", - "def": "456", - }), - }, - }, - } - - buf := record.encodeSeries(nil) - - decoded := recordPool.GetRecord() - - err := decodeWALRecord(buf, decoded) - require.Nil(t, err) - - // Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices. - // Both are valid, so check length. - require.Equal(t, 0, len(decoded.RefEntries)) - decoded.RefEntries = nil - require.Equal(t, record, decoded) -} - -func Test_Encoding_Entries(t *testing.T) { - for _, tc := range []struct { - desc string - rec *WALRecord - version RecordType - }{ - { - desc: "v1", - rec: &WALRecord{ - entryIndexMap: make(map[uint64]int), - UserID: "123", - RefEntries: []RefEntries{ - { - Ref: 456, - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(1000, 0), - Line: "first", - }, - { - Timestamp: time.Unix(2000, 0), - Line: "second", - }, - }, - }, - { - Ref: 789, - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(3000, 0), - Line: "third", - }, - { - Timestamp: time.Unix(4000, 0), - Line: "fourth", - }, - }, - }, - }, - }, - version: WALRecordEntriesV1, - }, - { - desc: "v2", - rec: &WALRecord{ - entryIndexMap: make(map[uint64]int), - UserID: "123", - RefEntries: []RefEntries{ - { - Ref: 456, - Counter: 1, // v2 uses counter for WAL replay - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(1000, 0), - Line: "first", - }, - { - Timestamp: time.Unix(2000, 0), - Line: "second", - }, - }, - }, - { - Ref: 789, - Counter: 2, // v2 uses counter for WAL replay - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(3000, 0), - Line: "third", - }, - { - Timestamp: time.Unix(4000, 0), - Line: "fourth", - }, - }, - }, - }, - }, - version: WALRecordEntriesV2, - }, - } { - decoded := recordPool.GetRecord() - buf := tc.rec.encodeEntries(tc.version, nil) - err := decodeWALRecord(buf, decoded) - require.Nil(t, err) - require.Equal(t, tc.rec, decoded) - - } -} - -func Benchmark_EncodeEntries(b *testing.B) { - var entries []logproto.Entry - for i := int64(0); i < 10000; i++ { - entries = append(entries, logproto.Entry{ - Timestamp: time.Unix(0, i), - Line: fmt.Sprintf("long line with a lot of data like a log %d", i), - }) - } - record := &WALRecord{ - entryIndexMap: make(map[uint64]int), - UserID: "123", - RefEntries: []RefEntries{ - { - Ref: 456, - Entries: entries, - }, - { - Ref: 789, - Entries: entries, - }, - }, - } - b.ReportAllocs() - b.ResetTimer() - buf := recordPool.GetBytes()[:0] - defer recordPool.PutBytes(buf) - - for n := 0; n < b.N; n++ { - record.encodeEntries(CurrentEntriesRec, buf) - } -} - -func Benchmark_DecodeWAL(b *testing.B) { - var entries []logproto.Entry - for i := int64(0); i < 10000; i++ { - entries = append(entries, logproto.Entry{ - Timestamp: time.Unix(0, i), - Line: fmt.Sprintf("long line with a lot of data like a log %d", i), - }) - } - record := &WALRecord{ - entryIndexMap: make(map[uint64]int), - UserID: "123", - RefEntries: []RefEntries{ - { - Ref: 456, - Entries: entries, - }, - { - Ref: 789, - Entries: entries, - }, - }, - } - - buf := record.encodeEntries(CurrentEntriesRec, nil) - rec := recordPool.GetRecord() - b.ReportAllocs() - b.ResetTimer() - - for n := 0; n < b.N; n++ { - require.NoError(b, decodeWALRecord(buf, rec)) - } -} - func fillChunk(t testing.TB, c chunkenc.Chunk) { t.Helper() var i int64 @@ -338,7 +147,7 @@ func Test_EncodingCheckpoint(t *testing.T) { }, } - b, err := encodeWithTypeHeader(s, CheckpointRecord, nil) + b, err := encodeWithTypeHeader(s, wal.CheckpointRecord, nil) require.Nil(t, err) out := &Series{} diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index f0a372679e..9cb54940eb 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" @@ -65,9 +66,9 @@ func TestChunkFlushingShutdown(t *testing.T) { type fullWAL struct{} -func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} } -func (fullWAL) Start() {} -func (fullWAL) Stop() error { return nil } +func (fullWAL) Log(_ *wal.Record) error { return &os.PathError{Err: syscall.ENOSPC} } +func (fullWAL) Start() {} +func (fullWAL) Stop() error { return nil } func Benchmark_FlushLoop(b *testing.B) { var ( diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 387d3da415..83aa48df46 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -20,6 +20,7 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/pkg/ingester/index" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" @@ -228,7 +229,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { return appendErr } -func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) { +func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) { // record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after // reducing the stream limits, for instance. var err error @@ -315,7 +316,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) *str // getOrCreateStream returns the stream or creates it. // It's safe to use this function if returned stream is not consistency sensitive to streamsMap(e.g. ingesterRecoverer), // otherwise use streamsMap.LoadOrStoreNew with locking stream's chunkMtx inside. -func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALRecord) (*stream, error) { +func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *wal.Record) (*stream, error) { s, _, err := i.streams.LoadOrStoreNew(pushReqStream.Labels, func() (*stream, error) { return i.createStream(pushReqStream, record) }, nil) diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 231d2e6ca7..6b99b61f97 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -1,7 +1,7 @@ package ingester import ( - io "io" + "io" "runtime" "sync" @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/tsdb/wlog" "golang.org/x/net/context" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/logproto" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -52,7 +53,7 @@ type Recoverer interface { NumWorkers() int Series(series *Series) error SetStream(userID string, series record.RefSeries) error - Push(userID string, entries RefEntries) error + Push(userID string, entries wal.RefEntries) error Done() <-chan struct{} } @@ -152,7 +153,7 @@ func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) er return nil } -func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { +func (r *ingesterRecoverer) Push(userID string, entries wal.RefEntries) error { return r.ing.replayController.WithBackPressure(func() error { out, ok := r.users.Load(userID) if !ok { @@ -232,7 +233,7 @@ func (r *ingesterRecoverer) Done() <-chan struct{} { func RecoverWAL(reader WALReader, recoverer Recoverer) error { dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput) error { rec := recordPool.GetRecord() - if err := decodeWALRecord(b, rec); err != nil { + if err := wal.DecodeRecord(b, rec); err != nil { return err } @@ -267,7 +268,7 @@ func RecoverWAL(reader WALReader, recoverer Recoverer) error { if !ok { return } - entries, ok := next.data.(RefEntries) + entries, ok := next.data.(wal.RefEntries) var err error if !ok { err = errors.Errorf("unexpected type (%T) when recovering WAL, expecting (%T)", next.data, entries) diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index 1f643b108b..6a663b7f42 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -2,7 +2,7 @@ package ingester import ( "context" - fmt "fmt" + "fmt" "runtime" "sync" "testing" @@ -16,6 +16,7 @@ import ( "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/logproto" loki_runtime "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage/chunk" @@ -47,12 +48,12 @@ func (m *MemoryWALReader) Err() error { return nil } func (m *MemoryWALReader) Record() []byte { return m.xs[0] } -func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALReader, []*WALRecord) { - var recs []*WALRecord +func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALReader, []*wal.Record) { + var recs []*wal.Record reader := &MemoryWALReader{} for i := 0; i < totalStreams; i++ { user := fmt.Sprintf("%d", i%users) - recs = append(recs, &WALRecord{ + recs = append(recs, &wal.Record{ UserID: user, Series: []record.RefSeries{ { @@ -74,9 +75,9 @@ func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALRea Line: fmt.Sprintf("%d", j), }) } - recs = append(recs, &WALRecord{ + recs = append(recs, &wal.Record{ UserID: user, - RefEntries: []RefEntries{ + RefEntries: []wal.RefEntries{ { Ref: chunks.HeadSeriesRef(i), Entries: entries, @@ -87,11 +88,11 @@ func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALRea for _, rec := range recs { if len(rec.Series) > 0 { - reader.xs = append(reader.xs, rec.encodeSeries(nil)) + reader.xs = append(reader.xs, rec.EncodeSeries(nil)) } if len(rec.RefEntries) > 0 { - reader.xs = append(reader.xs, rec.encodeEntries(CurrentEntriesRec, nil)) + reader.xs = append(reader.xs, rec.EncodeEntries(wal.CurrentEntriesRec, nil)) } } @@ -136,7 +137,7 @@ func (r *MemRecoverer) SetStream(userID string, series record.RefSeries) error { return nil } -func (r *MemRecoverer) Push(userID string, entries RefEntries) error { +func (r *MemRecoverer) Push(userID string, entries wal.RefEntries) error { r.Lock() defer r.Unlock() diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index d2bdb86cc5..30435f934e 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,15 +8,14 @@ import ( "sync" "time" - "github.com/weaveworks/common/httpgrpc" - "github.com/go-kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" @@ -149,7 +148,7 @@ func (s *stream) Push( entries []logproto.Entry, // WAL record to add push contents to. // May be nil to disable this functionality. - record *WALRecord, + record *wal.Record, // Counter used in WAL replay to avoid duplicates. // If this is non-zero, the stream will reject entries // with a counter value less than or equal to it's own. @@ -250,7 +249,7 @@ func hasRateLimitErr(errs []entryWithError) bool { return ok } -func (s *stream) recordAndSendToTailers(record *WALRecord, entries []logproto.Entry) { +func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.Entry) { if len(entries) == 0 { return } diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 56fcf4ceb2..dfb248a061 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -10,14 +10,14 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/tsdb/wlog" - "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/util/flagext" util_log "github.com/grafana/loki/pkg/util/log" ) var ( // shared pool for WALRecords and []logproto.Entries - recordPool = newRecordPool() + recordPool = wal.NewRecordPool() ) const walSegmentSize = wlog.DefaultSegmentSize * 4 @@ -54,16 +54,16 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { type WAL interface { Start() // Log marshalls the records and writes it into the WAL. - Log(*WALRecord) error + Log(*wal.Record) error // Stop stops all the WAL operations. Stop() error } type noopWAL struct{} -func (noopWAL) Start() {} -func (noopWAL) Log(*WALRecord) error { return nil } -func (noopWAL) Stop() error { return nil } +func (noopWAL) Start() {} +func (noopWAL) Log(*wal.Record) error { return nil } +func (noopWAL) Stop() error { return nil } type walWrapper struct { cfg WALConfig @@ -102,7 +102,7 @@ func (w *walWrapper) Start() { go w.run() } -func (w *walWrapper) Log(record *WALRecord) error { +func (w *walWrapper) Log(record *wal.Record) error { if record == nil || (len(record.Series) == 0 && len(record.RefEntries) == 0) { return nil } @@ -117,7 +117,7 @@ func (w *walWrapper) Log(record *WALRecord) error { // Always write series then entries. if len(record.Series) > 0 { - buf = record.encodeSeries(buf) + buf = record.EncodeSeries(buf) if err := w.wal.Log(buf); err != nil { return err } @@ -126,7 +126,7 @@ func (w *walWrapper) Log(record *WALRecord) error { buf = buf[:0] } if len(record.RefEntries) > 0 { - buf = record.encodeEntries(CurrentEntriesRec, buf) + buf = record.EncodeEntries(wal.CurrentEntriesRec, buf) if err := w.wal.Log(buf); err != nil { return err } @@ -166,55 +166,3 @@ func (w *walWrapper) run() { checkpointer.Run() } - -type resettingPool struct { - rPool *sync.Pool // records - ePool *sync.Pool // entries - bPool *sync.Pool // bytes -} - -func (p *resettingPool) GetRecord() *WALRecord { - rec := p.rPool.Get().(*WALRecord) - rec.Reset() - return rec -} - -func (p *resettingPool) PutRecord(r *WALRecord) { - p.rPool.Put(r) -} - -func (p *resettingPool) GetEntries() []logproto.Entry { - return p.ePool.Get().([]logproto.Entry) -} - -func (p *resettingPool) PutEntries(es []logproto.Entry) { - p.ePool.Put(es[:0]) // nolint:staticcheck -} - -func (p *resettingPool) GetBytes() []byte { - return p.bPool.Get().([]byte) -} - -func (p *resettingPool) PutBytes(b []byte) { - p.bPool.Put(b[:0]) // nolint:staticcheck -} - -func newRecordPool() *resettingPool { - return &resettingPool{ - rPool: &sync.Pool{ - New: func() interface{} { - return &WALRecord{} - }, - }, - ePool: &sync.Pool{ - New: func() interface{} { - return make([]logproto.Entry, 0, 512) - }, - }, - bPool: &sync.Pool{ - New: func() interface{} { - return make([]byte, 0, 1<<10) // 1kb - }, - }, - } -} diff --git a/pkg/ingester/wal/LICENSE_APACHE2 b/pkg/ingester/wal/LICENSE_APACHE2 new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/pkg/ingester/wal/LICENSE_APACHE2 @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/pkg/ingester/encoding.go b/pkg/ingester/wal/encoding.go similarity index 87% rename from pkg/ingester/encoding.go rename to pkg/ingester/wal/encoding.go index 9f3d76a2f7..7c5411d7ea 100644 --- a/pkg/ingester/encoding.go +++ b/pkg/ingester/wal/encoding.go @@ -1,4 +1,4 @@ -package ingester +package wal import ( "time" @@ -29,10 +29,10 @@ const ( // The current type of Entries that this distribution writes. // Loki can read in a backwards compatible manner, but will write the newest variant. -const CurrentEntriesRec RecordType = WALRecordEntriesV2 +const CurrentEntriesRec = WALRecordEntriesV2 -// WALRecord is a struct combining the series and samples record. -type WALRecord struct { +// Record is a struct combining the series and samples record. +type Record struct { UserID string Series []record.RefSeries @@ -43,24 +43,21 @@ type WALRecord struct { RefEntries []RefEntries } -func (r *WALRecord) IsEmpty() bool { +func (r *Record) IsEmpty() bool { return len(r.Series) == 0 && len(r.RefEntries) == 0 } -func (r *WALRecord) Reset() { +func (r *Record) Reset() { r.UserID = "" if len(r.Series) > 0 { r.Series = r.Series[:0] } - for _, ref := range r.RefEntries { - recordPool.PutEntries(ref.Entries) - } r.RefEntries = r.RefEntries[:0] r.entryIndexMap = make(map[uint64]int) } -func (r *WALRecord) AddEntries(fp uint64, counter int64, entries ...logproto.Entry) { +func (r *Record) AddEntries(fp uint64, counter int64, entries ...logproto.Entry) { if idx, ok := r.entryIndexMap[fp]; ok { r.RefEntries[idx].Entries = append(r.RefEntries[idx].Entries, entries...) r.RefEntries[idx].Counter = counter @@ -81,7 +78,7 @@ type RefEntries struct { Entries []logproto.Entry } -func (r *WALRecord) encodeSeries(b []byte) []byte { +func (r *Record) EncodeSeries(b []byte) []byte { buf := encoding.EncWith(b) buf.PutByte(byte(WALRecordSeries)) buf.PutUvarintStr(r.UserID) @@ -95,7 +92,7 @@ func (r *WALRecord) encodeSeries(b []byte) []byte { return encoded } -func (r *WALRecord) encodeEntries(version RecordType, b []byte) []byte { +func (r *Record) EncodeEntries(version RecordType, b []byte) []byte { buf := encoding.EncWith(b) buf.PutByte(byte(version)) buf.PutUvarintStr(r.UserID) @@ -136,7 +133,7 @@ outer: return buf.Get() } -func decodeEntries(b []byte, version RecordType, rec *WALRecord) error { +func DecodeEntries(b []byte, version RecordType, rec *Record) error { if len(b) == 0 { return nil } @@ -184,7 +181,7 @@ func decodeEntries(b []byte, version RecordType, rec *WALRecord) error { return nil } -func decodeWALRecord(b []byte, walRec *WALRecord) (err error) { +func DecodeRecord(b []byte, walRec *Record) (err error) { var ( userID string dec record.Decoder @@ -200,7 +197,7 @@ func decodeWALRecord(b []byte, walRec *WALRecord) (err error) { rSeries, err = dec.Series(decbuf.B, walRec.Series) case WALRecordEntriesV1, WALRecordEntriesV2: userID = decbuf.UvarintStr() - err = decodeEntries(decbuf.B, t, walRec) + err = DecodeEntries(decbuf.B, t, walRec) default: return errors.New("unknown record type") } diff --git a/pkg/ingester/wal/encoding_test.go b/pkg/ingester/wal/encoding_test.go new file mode 100644 index 0000000000..79eff31ec0 --- /dev/null +++ b/pkg/ingester/wal/encoding_test.go @@ -0,0 +1,208 @@ +package wal + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" +) + +var ( + recordPool = NewRecordPool() +) + +func Test_Encoding_Series(t *testing.T) { + record := &Record{ + entryIndexMap: make(map[uint64]int), + UserID: "123", + Series: []record.RefSeries{ + { + Ref: 456, + Labels: labels.FromMap(map[string]string{ + "foo": "bar", + "bazz": "buzz", + }), + }, + { + Ref: 789, + Labels: labels.FromMap(map[string]string{ + "abc": "123", + "def": "456", + }), + }, + }, + } + + buf := record.EncodeSeries(nil) + + decoded := recordPool.GetRecord() + + err := DecodeRecord(buf, decoded) + require.Nil(t, err) + + // Since we use a pool, there can be subtle differentiations between nil slices and len(0) slices. + // Both are valid, so check length. + require.Equal(t, 0, len(decoded.RefEntries)) + decoded.RefEntries = nil + require.Equal(t, record, decoded) +} + +func Test_Encoding_Entries(t *testing.T) { + for _, tc := range []struct { + desc string + rec *Record + version RecordType + }{ + { + desc: "v1", + rec: &Record{ + entryIndexMap: make(map[uint64]int), + UserID: "123", + RefEntries: []RefEntries{ + { + Ref: 456, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1000, 0), + Line: "first", + }, + { + Timestamp: time.Unix(2000, 0), + Line: "second", + }, + }, + }, + { + Ref: 789, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3000, 0), + Line: "third", + }, + { + Timestamp: time.Unix(4000, 0), + Line: "fourth", + }, + }, + }, + }, + }, + version: WALRecordEntriesV1, + }, + { + desc: "v2", + rec: &Record{ + entryIndexMap: make(map[uint64]int), + UserID: "123", + RefEntries: []RefEntries{ + { + Ref: 456, + Counter: 1, // v2 uses counter for WAL replay + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1000, 0), + Line: "first", + }, + { + Timestamp: time.Unix(2000, 0), + Line: "second", + }, + }, + }, + { + Ref: 789, + Counter: 2, // v2 uses counter for WAL replay + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3000, 0), + Line: "third", + }, + { + Timestamp: time.Unix(4000, 0), + Line: "fourth", + }, + }, + }, + }, + }, + version: WALRecordEntriesV2, + }, + } { + decoded := recordPool.GetRecord() + buf := tc.rec.EncodeEntries(tc.version, nil) + err := DecodeRecord(buf, decoded) + require.Nil(t, err) + require.Equal(t, tc.rec, decoded) + + } +} + +func Benchmark_EncodeEntries(b *testing.B) { + var entries []logproto.Entry + for i := int64(0); i < 10000; i++ { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(0, i), + Line: fmt.Sprintf("long line with a lot of data like a log %d", i), + }) + } + record := &Record{ + entryIndexMap: make(map[uint64]int), + UserID: "123", + RefEntries: []RefEntries{ + { + Ref: 456, + Entries: entries, + }, + { + Ref: 789, + Entries: entries, + }, + }, + } + b.ReportAllocs() + b.ResetTimer() + buf := recordPool.GetBytes()[:0] + defer recordPool.PutBytes(buf) + + for n := 0; n < b.N; n++ { + record.EncodeEntries(CurrentEntriesRec, buf) + } +} + +func Benchmark_DecodeWAL(b *testing.B) { + var entries []logproto.Entry + for i := int64(0); i < 10000; i++ { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(0, i), + Line: fmt.Sprintf("long line with a lot of data like a log %d", i), + }) + } + record := &Record{ + entryIndexMap: make(map[uint64]int), + UserID: "123", + RefEntries: []RefEntries{ + { + Ref: 456, + Entries: entries, + }, + { + Ref: 789, + Entries: entries, + }, + }, + } + + buf := record.EncodeEntries(CurrentEntriesRec, nil) + rec := recordPool.GetRecord() + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + require.NoError(b, DecodeRecord(buf, rec)) + } +} diff --git a/pkg/ingester/wal/recordpool.go b/pkg/ingester/wal/recordpool.go new file mode 100644 index 0000000000..0c74f54f21 --- /dev/null +++ b/pkg/ingester/wal/recordpool.go @@ -0,0 +1,62 @@ +package wal + +import ( + "sync" + + "github.com/grafana/loki/pkg/logproto" +) + +type ResettingPool struct { + rPool *sync.Pool // records + ePool *sync.Pool // entries + bPool *sync.Pool // bytes +} + +func NewRecordPool() *ResettingPool { + return &ResettingPool{ + rPool: &sync.Pool{ + New: func() interface{} { + return &Record{} + }, + }, + ePool: &sync.Pool{ + New: func() interface{} { + return make([]logproto.Entry, 0, 512) + }, + }, + bPool: &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 1<<10) // 1kb + }, + }, + } +} + +func (p *ResettingPool) GetRecord() *Record { + rec := p.rPool.Get().(*Record) + rec.Reset() + for _, ref := range rec.RefEntries { + p.PutEntries(ref.Entries) + } + return rec +} + +func (p *ResettingPool) PutRecord(r *Record) { + p.rPool.Put(r) +} + +func (p *ResettingPool) GetEntries() []logproto.Entry { + return p.ePool.Get().([]logproto.Entry) +} + +func (p *ResettingPool) PutEntries(es []logproto.Entry) { + p.ePool.Put(es[:0]) // nolint:staticcheck +} + +func (p *ResettingPool) GetBytes() []byte { + return p.bPool.Get().([]byte) +} + +func (p *ResettingPool) PutBytes(b []byte) { + p.bPool.Put(b[:0]) // nolint:staticcheck +}