diff --git a/go.mod b/go.mod index 86c6a0dd23..ffa5693228 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go/pubsub v1.3.1 github.com/Masterminds/sprig/v3 v3.2.2 github.com/NYTimes/gziphandler v1.1.1 + github.com/Workiva/go-datastructures v1.0.53 github.com/aws/aws-lambda-go v1.17.0 github.com/bmatcuk/doublestar v1.2.2 github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee diff --git a/go.sum b/go.sum index 4d2d0f9549..d3a2e85c16 100644 --- a/go.sum +++ b/go.sum @@ -172,6 +172,8 @@ github.com/Shopify/sarama v1.27.1/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig= +github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJMsVxt52+i0Ha45fjshj6wxYr1r19tB9bw= github.com/aerospike/aerospike-client-go v1.27.0/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= @@ -1344,6 +1346,7 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= @@ -1608,12 +1611,14 @@ github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1C github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 h1:j6JEOq5QWFker+d7mFQYOhjTZonQ7YkLTHm56dbn+yM= github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448 h1:hbyjqt5UnyKeOT3rFVxLxi7iTI6XqR2p4TkwEAQdUiw= github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448/go.mod h1:Q5IRRDY+cjIaiOjTAnXN5LKQV5MPqVx5ofQn85Jy5Yw= +github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= @@ -2147,6 +2152,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE= golang.org/x/tools v0.0.0-20201014170642-d1624618ad65/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/tools v0.0.0-20201020161133-226fd2f889ca/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= +golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201119054027-25dc3e1ccc3c/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 1c6ac2ee0b..7eaf01d565 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -32,6 +32,8 @@ const ( chunkFormatV2 chunkFormatV3 + DefaultChunkFormat = chunkFormatV3 // the currently used chunk format + blocksPerChunk = 10 maxLineLength = 1024 * 1024 * 1024 @@ -277,7 +279,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk { blocks: []block{}, head: &headBlock{}, - format: chunkFormatV3, + format: DefaultChunkFormat, encoding: enc, } @@ -350,7 +352,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { // Verify checksums. expCRC := binary.BigEndian.Uint32(b[blk.offset+l:]) if expCRC != crc32.Checksum(blk.b, castagnoliTable) { - level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum) + _ = level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum) continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index b5a254d68f..63edb14c6f 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -40,10 +40,9 @@ var testEncoding = []Encoding{ } var ( - testBlockSize = 256 * 1024 - testTargetSize = 1500 * 1024 - noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) - countExtractor = func() log.StreamSampleExtractor { + testBlockSize = 256 * 1024 + testTargetSize = 1500 * 1024 + countExtractor = func() log.StreamSampleExtractor { ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) if err != nil { panic(err) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go new file mode 100644 index 0000000000..0732a07273 --- /dev/null +++ b/pkg/chunkenc/unordered.go @@ -0,0 +1,416 @@ +package chunkenc + +import ( + "bytes" + "context" + "encoding/binary" + "io" + "math" + "sort" + "time" + + "github.com/Workiva/go-datastructures/rangetree" + "github.com/cespare/xxhash/v2" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/logqlmodel/stats" +) + +var ( + noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) +) + +type unorderedHeadBlock struct { + // Opted for range tree over skiplist for space reduction. + // Inserts: O(log(n)) + // Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries + rt rangetree.RangeTree + + lines int // number of entries + size int // size of uncompressed bytes. + mint, maxt int64 // upper and lower bounds +} + +func newUnorderedHeadBlock() *unorderedHeadBlock { + return &unorderedHeadBlock{ + rt: rangetree.New(1), + } +} + +func (hb *unorderedHeadBlock) isEmpty() bool { + return hb.size == 0 +} + +// collection of entries belonging to the same nanosecond +type nsEntries struct { + ts int64 + entries []string +} + +func (e *nsEntries) ValueAtDimension(_ uint64) int64 { + return e.ts +} + +func (hb *unorderedHeadBlock) append(ts int64, line string) { + // This is an allocation hack. The rangetree lib does not + // support the ability to pass a "mutate" function during an insert + // and instead will displace any existing entry at the specified timestamp. + // Since Loki supports multiple lines per timestamp, + // we insert an entry without any log lines, + // which is ordered by timestamp alone. + // Then, we detect if we've displaced any existing entries, and + // append the new one to the existing, preallocated slice. + // If not, we create a slice with one entry. + e := &nsEntries{ + ts: ts, + } + displaced := hb.rt.Add(e) + if displaced[0] != nil { + e.entries = append(displaced[0].(*nsEntries).entries, line) + } else { + e.entries = []string{line} + } + + // Update hb metdata + if hb.size == 0 || hb.mint > ts { + hb.mint = ts + } + + if hb.maxt < ts { + hb.maxt = ts + } + + hb.size += len(line) + hb.lines++ + +} + +// Implements rangetree.Interval +type interval struct { + mint, maxt int64 +} + +func (i interval) LowAtDimension(_ uint64) int64 { return i.mint } + +// rangetree library treats this as inclusive, but we want exclusivity, +// or [from, through) in nanoseconds +func (i interval) HighAtDimension(_ uint64) int64 { return i.maxt - 1 } + +// helper for base logic across {Entry,Sample}Iterator +func (hb *unorderedHeadBlock) forEntries( + ctx context.Context, + direction logproto.Direction, + mint, + maxt int64, + entryFn func(int64, string) error, // returning an error exits early +) (err error) { + if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { + return + } + + entries := hb.rt.Query(interval{ + mint: mint, + maxt: maxt, + }) + + chunkStats := stats.GetChunkData(ctx) + process := func(es *nsEntries) { + chunkStats.HeadChunkLines += int64(len(es.entries)) + + // preserve write ordering of entries with the same ts + var i int + if direction == logproto.BACKWARD { + i = len(es.entries) - 1 + } + next := func() { + if direction == logproto.FORWARD { + i++ + } else { + i-- + } + } + + for ; i < len(es.entries) && i >= 0; next() { + line := es.entries[i] + chunkStats.HeadChunkBytes += int64(len(line)) + err = entryFn(es.ts, line) + + } + } + + if direction == logproto.FORWARD { + for _, e := range entries { + process(e.(*nsEntries)) + if err != nil { + return err + } + } + } else { + for i := len(entries) - 1; i >= 0; i-- { + process(entries[i].(*nsEntries)) + if err != nil { + return err + } + } + } + + return nil +} + +func (hb *unorderedHeadBlock) iterator( + ctx context.Context, + direction logproto.Direction, + mint, + maxt int64, + pipeline log.StreamPipeline, +) iter.EntryIterator { + + // We are doing a copy everytime, this is because b.entries could change completely, + // the alternate would be that we allocate a new b.entries everytime we cut a block, + // but the tradeoff is that queries to near-realtime data would be much lower than + // cutting of blocks. + streams := map[uint64]*logproto.Stream{} + + _ = hb.forEntries( + ctx, + direction, + mint, + maxt, + func(ts int64, line string) error { + newLine, parsedLbs, ok := pipeline.ProcessString(line) + if !ok { + return nil + } + + var stream *logproto.Stream + lhash := parsedLbs.Hash() + if stream, ok = streams[lhash]; !ok { + stream = &logproto.Stream{ + Labels: parsedLbs.String(), + } + streams[lhash] = stream + } + + stream.Entries = append(stream.Entries, logproto.Entry{ + Timestamp: time.Unix(0, ts), + Line: newLine, + }) + return nil + }, + ) + + if len(streams) == 0 { + return iter.NoopIterator + } + streamsResult := make([]logproto.Stream, 0, len(streams)) + for _, stream := range streams { + streamsResult = append(streamsResult, *stream) + } + return iter.NewStreamsIterator(ctx, streamsResult, direction) +} + +// nolint:unused +func (hb *unorderedHeadBlock) sampleIterator( + ctx context.Context, + mint, + maxt int64, + extractor log.StreamSampleExtractor, +) iter.SampleIterator { + + series := map[uint64]*logproto.Series{} + + _ = hb.forEntries( + ctx, + logproto.FORWARD, + mint, + maxt, + func(ts int64, line string) error { + value, parsedLabels, ok := extractor.ProcessString(line) + if !ok { + return nil + } + var found bool + var s *logproto.Series + lhash := parsedLabels.Hash() + if s, found = series[lhash]; !found { + s = &logproto.Series{ + Labels: parsedLabels.String(), + } + series[lhash] = s + } + + // []byte here doesn't create allocation because Sum64 has go:noescape directive + // It specifies that the function does not allow any of the pointers passed as arguments + // to escape into the heap or into the values returned from the function. + h := xxhash.Sum64([]byte(line)) + s.Samples = append(s.Samples, logproto.Sample{ + Timestamp: ts, + Value: value, + Hash: h, + }) + return nil + }, + ) + + if len(series) == 0 { + return iter.NoopIterator + } + seriesRes := make([]logproto.Series, 0, len(series)) + for _, s := range series { + // todo(ctovena) not sure we need this sort. + sort.Sort(s) + seriesRes = append(seriesRes, *s) + } + return iter.NewMultiSeriesIterator(ctx, seriesRes) +} + +// nolint:unused +// serialise is used in creating an ordered, compressed block from an unorderedHeadBlock +func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { + inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) + defer func() { + inBuf.Reset() + serializeBytesBufferPool.Put(inBuf) + }() + outBuf := &bytes.Buffer{} + + encBuf := make([]byte, binary.MaxVarintLen64) + compressedWriter := pool.GetWriter(outBuf) + defer pool.PutWriter(compressedWriter) + + _ = hb.forEntries( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + func(ts int64, line string) error { + n := binary.PutVarint(encBuf, ts) + inBuf.Write(encBuf[:n]) + + n = binary.PutUvarint(encBuf, uint64(len(line))) + inBuf.Write(encBuf[:n]) + + inBuf.WriteString(line) + return nil + }, + ) + + if _, err := compressedWriter.Write(inBuf.Bytes()); err != nil { + return nil, errors.Wrap(err, "appending entry") + } + if err := compressedWriter.Close(); err != nil { + return nil, errors.Wrap(err, "flushing pending compress buffer") + } + + return outBuf.Bytes(), nil +} + +// CheckpointSize returns the estimated size of the headblock checkpoint. +func (hb *unorderedHeadBlock) CheckpointSize(version byte) int { + size := 1 // version + size += binary.MaxVarintLen32 * 2 // total entries + total size + size += binary.MaxVarintLen64 * 2 // mint,maxt + size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line. + size += hb.size // uncompressed bytes of lines + return size +} + +// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing, +// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but +// needs to serialize/deserialize the data to disk to ensure data durability. +func (hb *unorderedHeadBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) { + buf := bytes.NewBuffer(b[:0]) + err := hb.CheckpointTo(version, buf) + return buf.Bytes(), err +} + +// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`. +func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error { + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + eb.reset() + + eb.putByte(version) + _, err := w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock version") + } + eb.reset() + + eb.putUvarint(hb.lines) + + _, err = w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock metas") + } + eb.reset() + + err = hb.forEntries( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + func(ts int64, line string) error { + eb.putVarint64(ts) + eb.putUvarint(len(line)) + _, err = w.Write(eb.get()) + if err != nil { + return errors.Wrap(err, "write headBlock entry ts") + } + eb.reset() + + _, err := io.WriteString(w, line) + if err != nil { + return errors.Wrap(err, "write headblock entry line") + } + return nil + }, + ) + + return nil +} + +func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error { + // ensure it's empty + *hb = *newUnorderedHeadBlock() + + if len(b) < 1 { + return nil + } + + db := decbuf{b: b} + + version := db.byte() + if db.err() != nil { + return errors.Wrap(db.err(), "verifying headblock header") + } + switch version { + case chunkFormatV1, chunkFormatV2, chunkFormatV3: + default: + return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version) + } + + n := db.uvarint() + + if err := db.err(); err != nil { + return errors.Wrap(err, "verifying headblock metadata") + } + + for i := 0; i < n && db.err() == nil; i++ { + ts := db.varint64() + lineLn := db.uvarint() + line := string(db.bytes(lineLn)) + hb.append(ts, line) + } + + if err := db.err(); err != nil { + return errors.Wrap(err, "decoding entries") + } + + return nil +} diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go new file mode 100644 index 0000000000..210c932085 --- /dev/null +++ b/pkg/chunkenc/unordered_test.go @@ -0,0 +1,315 @@ +package chunkenc + +import ( + "context" + "errors" + "fmt" + "math" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" +) + +func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { + var i int + for got.Next() { + require.Equal(t, logproto.Entry{ + Timestamp: time.Unix(0, exp[i].t), + Line: exp[i].s, + }, got.Entry()) + i++ + } + require.Equal(t, i, len(exp)) +} + +func Test_forEntriesEarlyReturn(t *testing.T) { + hb := newUnorderedHeadBlock() + for i := 0; i < 10; i++ { + hb.append(int64(i), fmt.Sprint(i)) + } + + // forward + var forwardCt int + var forwardStop int64 + err := hb.forEntries( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + func(ts int64, line string) error { + forwardCt++ + forwardStop = ts + if ts == 5 { + return errors.New("err") + } + return nil + }, + ) + require.Error(t, err) + require.Equal(t, int64(5), forwardStop) + require.Equal(t, 6, forwardCt) + + // backward + var backwardCt int + var backwardStop int64 + err = hb.forEntries( + context.Background(), + logproto.BACKWARD, + 0, + math.MaxInt64, + func(ts int64, line string) error { + backwardCt++ + backwardStop = ts + if ts == 5 { + return errors.New("err") + } + return nil + }, + ) + require.Error(t, err) + require.Equal(t, int64(5), backwardStop) + require.Equal(t, 5, backwardCt) +} + +func Test_Unordered_InsertRetrieval(t *testing.T) { + for _, tc := range []struct { + desc string + input, exp []entry + dir logproto.Direction + }{ + { + desc: "simple forward", + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + exp: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + }, + { + desc: "simple backward", + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + exp: []entry{ + {2, "c"}, {1, "b"}, {0, "a"}, + }, + dir: logproto.BACKWARD, + }, + { + desc: "unordered forward", + input: []entry{ + {1, "b"}, {0, "a"}, {2, "c"}, + }, + exp: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, + }, + }, + { + desc: "unordered backward", + input: []entry{ + {1, "b"}, {0, "a"}, {2, "c"}, + }, + exp: []entry{ + {2, "c"}, {1, "b"}, {0, "a"}, + }, + dir: logproto.BACKWARD, + }, + { + desc: "ts collision forward", + input: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, + }, + exp: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, + }, + }, + { + desc: "ts collision backward", + input: []entry{ + {0, "a"}, {0, "b"}, {1, "c"}, + }, + exp: []entry{ + {1, "c"}, {0, "b"}, {0, "a"}, + }, + dir: logproto.BACKWARD, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + hb := newUnorderedHeadBlock() + for _, e := range tc.input { + hb.append(e.t, e.s) + } + + itr := hb.iterator( + context.Background(), + tc.dir, + 0, + math.MaxInt64, + noopStreamPipeline, + ) + + iterEq(t, tc.exp, itr) + }) + } +} + +func Test_UnorderedBoundedIter(t *testing.T) { + for _, tc := range []struct { + desc string + mint, maxt int64 + dir logproto.Direction + input []entry + exp []entry + }{ + { + desc: "simple", + mint: 1, + maxt: 4, + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"}, + }, + exp: []entry{ + {1, "b"}, {2, "c"}, {3, "d"}, + }, + }, + { + desc: "simple backward", + mint: 1, + maxt: 4, + input: []entry{ + {0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"}, + }, + exp: []entry{ + {3, "d"}, {2, "c"}, {1, "b"}, + }, + dir: logproto.BACKWARD, + }, + { + desc: "unordered", + mint: 1, + maxt: 4, + input: []entry{ + {0, "a"}, {2, "c"}, {1, "b"}, {4, "e"}, {3, "d"}, + }, + exp: []entry{ + {1, "b"}, {2, "c"}, {3, "d"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + hb := newUnorderedHeadBlock() + for _, e := range tc.input { + hb.append(e.t, e.s) + } + + itr := hb.iterator( + context.Background(), + tc.dir, + tc.mint, + tc.maxt, + noopStreamPipeline, + ) + + iterEq(t, tc.exp, itr) + }) + } +} + +func Test_UnorderedHeadBlockCheckpointRoundtrip(t *testing.T) { + hb := newUnorderedHeadBlock() + + for i := 0; i < 100; i++ { + hb.append(int64(i), fmt.Sprint(i)) + } + + // turn to bytes + b, err := hb.CheckpointBytes(DefaultChunkFormat, nil) + require.Nil(t, err) + + // restore a copy from bytes + cpy := newUnorderedHeadBlock() + require.Nil(t, cpy.FromCheckpoint(b)) + + // ensure copy's bytes match original + cpyBytes, err := cpy.CheckpointBytes(DefaultChunkFormat, nil) + require.Nil(t, err) + require.Equal(t, b, cpyBytes) + +} + +func BenchmarkHeadBlockWrites(b *testing.B) { + // ordered, ordered + // unordered, ordered + // unordered, unordered + + // current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block + var nWrites = (256 << 10) / 50 + + headBlockFn := func() func(int64, string) { + hb := &headBlock{} + return func(ts int64, line string) { + _ = hb.append(ts, line) + } + } + + unorderedHeadBlockFn := func() func(int64, string) { + hb := newUnorderedHeadBlock() + return func(ts int64, line string) { + hb.append(ts, line) + } + } + + for _, tc := range []struct { + desc string + fn func() func(int64, string) + unorderedWrites bool + }{ + { + desc: "ordered headblock ordered writes", + fn: headBlockFn, + }, + { + desc: "unordered headblock ordered writes", + fn: unorderedHeadBlockFn, + }, + { + desc: "unordered headblock unordered writes", + fn: unorderedHeadBlockFn, + unorderedWrites: true, + }, + } { + // build writes before we start benchmarking so random number generation, etc, + // isn't included in our timing info + writes := make([]entry, 0, nWrites) + rnd := rand.NewSource(0) + for i := 0; i < nWrites; i++ { + if tc.unorderedWrites { + ts := rnd.Int63() + writes = append(writes, entry{ + t: ts, + s: fmt.Sprint("line:", ts), + }) + } else { + writes = append(writes, entry{ + t: int64(i), + s: fmt.Sprint("line:", i), + }) + } + } + + b.Run(tc.desc, func(b *testing.B) { + for n := 0; n < b.N; n++ { + writeFn := tc.fn() + for _, w := range writes { + writeFn(w.t, w.s) + } + } + }) + } +} diff --git a/vendor/github.com/Workiva/go-datastructures/LICENSE b/vendor/github.com/Workiva/go-datastructures/LICENSE new file mode 100644 index 0000000000..7a4a3ea242 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/entries.go b/vendor/github.com/Workiva/go-datastructures/rangetree/entries.go new file mode 100644 index 0000000000..0206fd8754 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/entries.go @@ -0,0 +1,45 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "sync" + +var entriesPool = sync.Pool{ + New: func() interface{} { + return make(Entries, 0, 10) + }, +} + +// Entries is a typed list of Entry that can be reused if Dispose +// is called. +type Entries []Entry + +// Dispose will free the resources consumed by this list and +// allow the list to be reused. +func (entries *Entries) Dispose() { + for i := 0; i < len(*entries); i++ { + (*entries)[i] = nil + } + + *entries = (*entries)[:0] + entriesPool.Put(*entries) +} + +// NewEntries will return a reused list of entries. +func NewEntries() Entries { + return entriesPool.Get().(Entries) +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/error.go b/vendor/github.com/Workiva/go-datastructures/rangetree/error.go new file mode 100644 index 0000000000..3166c2cbc8 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/error.go @@ -0,0 +1,40 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "fmt" + +// NoEntriesError is returned from an operation that requires +// existing entries when none are found. +type NoEntriesError struct{} + +func (nee NoEntriesError) Error() string { + return `No entries in this tree.` +} + +// OutOfDimensionError is returned when a requested operation +// doesn't meet dimensional requirements. +type OutOfDimensionError struct { + provided, max uint64 +} + +func (oode OutOfDimensionError) Error() string { + return fmt.Sprintf(`Provided dimension: %d is + greater than max dimension: %d`, + oode.provided, oode.max, + ) +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go b/vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go new file mode 100644 index 0000000000..89b42fa11d --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go @@ -0,0 +1,275 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "github.com/Workiva/go-datastructures/slice" + +type immutableRangeTree struct { + number uint64 + top orderedNodes + dimensions uint64 +} + +func newCache(dimensions uint64) []slice.Int64Slice { + cache := make([]slice.Int64Slice, 0, dimensions-1) + for i := uint64(0); i < dimensions; i++ { + cache = append(cache, slice.Int64Slice{}) + } + return cache +} + +func (irt *immutableRangeTree) needNextDimension() bool { + return irt.dimensions > 1 +} + +func (irt *immutableRangeTree) add(nodes *orderedNodes, cache []slice.Int64Slice, entry Entry, added *uint64) { + var node *node + list := nodes + + for i := uint64(1); i <= irt.dimensions; i++ { + if isLastDimension(irt.dimensions, i) { + if i != 1 && !cache[i-1].Exists(node.value) { + nodes := make(orderedNodes, len(*list)) + copy(nodes, *list) + list = &nodes + cache[i-1].Insert(node.value) + } + + newNode := newNode(entry.ValueAtDimension(i), entry, false) + overwritten := list.add(newNode) + if overwritten == nil { + *added++ + } + if node != nil { + node.orderedNodes = *list + } + break + } + + if i != 1 && !cache[i-1].Exists(node.value) { + nodes := make(orderedNodes, len(*list)) + copy(nodes, *list) + list = &nodes + cache[i-1].Insert(node.value) + node.orderedNodes = *list + } + + node, _ = list.getOrAdd(entry, i, irt.dimensions) + list = &node.orderedNodes + } +} + +// Add will add the provided entries into the tree and return +// a new tree with those entries added. +func (irt *immutableRangeTree) Add(entries ...Entry) *immutableRangeTree { + if len(entries) == 0 { + return irt + } + + cache := newCache(irt.dimensions) + top := make(orderedNodes, len(irt.top)) + copy(top, irt.top) + added := uint64(0) + for _, entry := range entries { + irt.add(&top, cache, entry, &added) + } + + tree := newImmutableRangeTree(irt.dimensions) + tree.top = top + tree.number = irt.number + added + return tree +} + +// InsertAtDimension will increment items at and above the given index +// by the number provided. Provide a negative number to to decrement. +// Returned are two lists and the modified tree. The first list is a +// list of entries that were moved. The second is a list entries that +// were deleted. These lists are exclusive. +func (irt *immutableRangeTree) InsertAtDimension(dimension uint64, + index, number int64) (*immutableRangeTree, Entries, Entries) { + + if dimension > irt.dimensions || number == 0 { + return irt, nil, nil + } + + modified, deleted := make(Entries, 0, 100), make(Entries, 0, 100) + + tree := newImmutableRangeTree(irt.dimensions) + tree.top = irt.top.immutableInsert( + dimension, 1, irt.dimensions, + index, number, + &modified, &deleted, + ) + tree.number = irt.number - uint64(len(deleted)) + + return tree, modified, deleted +} + +type immutableNodeBundle struct { + list *orderedNodes + index int + previousNode *node + newNode *node +} + +func (irt *immutableRangeTree) Delete(entries ...Entry) *immutableRangeTree { + cache := newCache(irt.dimensions) + top := make(orderedNodes, len(irt.top)) + copy(top, irt.top) + deleted := uint64(0) + for _, entry := range entries { + irt.delete(&top, cache, entry, &deleted) + } + + tree := newImmutableRangeTree(irt.dimensions) + tree.top = top + tree.number = irt.number - deleted + return tree +} + +func (irt *immutableRangeTree) delete(top *orderedNodes, + cache []slice.Int64Slice, entry Entry, deleted *uint64) { + + path := make([]*immutableNodeBundle, 0, 5) + var index int + var n *node + var local *node + list := top + + for i := uint64(1); i <= irt.dimensions; i++ { + value := entry.ValueAtDimension(i) + local, index = list.get(value) + if local == nil { // there's nothing to delete + return + } + + nb := &immutableNodeBundle{ + list: list, + index: index, + previousNode: n, + } + path = append(path, nb) + n = local + list = &n.orderedNodes + } + + *deleted++ + + for i := len(path) - 1; i >= 0; i-- { + nb := path[i] + if nb.previousNode != nil { + nodes := make(orderedNodes, len(*nb.list)) + copy(nodes, *nb.list) + nb.list = &nodes + if len(*nb.list) == 1 { + continue + } + nn := newNode( + nb.previousNode.value, + nb.previousNode.entry, + !isLastDimension(irt.dimensions, uint64(i)+1), + ) + nn.orderedNodes = nodes + path[i-1].newNode = nn + } + } + + for _, nb := range path { + if nb.newNode == nil { + nb.list.deleteAt(nb.index) + } else { + (*nb.list)[nb.index] = nb.newNode + } + } +} + +func (irt *immutableRangeTree) apply(list orderedNodes, interval Interval, + dimension uint64, fn func(*node) bool) bool { + + low, high := interval.LowAtDimension(dimension), interval.HighAtDimension(dimension) + + if isLastDimension(irt.dimensions, dimension) { + if !list.apply(low, high, fn) { + return false + } + } else { + if !list.apply(low, high, func(n *node) bool { + if !irt.apply(n.orderedNodes, interval, dimension+1, fn) { + return false + } + return true + }) { + return false + } + return true + } + + return true +} + +// Query will return an ordered list of results in the given +// interval. +func (irt *immutableRangeTree) Query(interval Interval) Entries { + entries := NewEntries() + + irt.apply(irt.top, interval, 1, func(n *node) bool { + entries = append(entries, n.entry) + return true + }) + + return entries +} + +func (irt *immutableRangeTree) get(entry Entry) Entry { + on := irt.top + for i := uint64(1); i <= irt.dimensions; i++ { + n, _ := on.get(entry.ValueAtDimension(i)) + if n == nil { + return nil + } + if i == irt.dimensions { + return n.entry + } + on = n.orderedNodes + } + + return nil +} + +// Get returns any entries that exist at the addresses provided by the +// given entries. Entries are returned in the order in which they are +// received. If an entry cannot be found, a nil is returned in its +// place. +func (irt *immutableRangeTree) Get(entries ...Entry) Entries { + result := make(Entries, 0, len(entries)) + for _, entry := range entries { + result = append(result, irt.get(entry)) + } + + return result +} + +// Len returns the number of items in this tree. +func (irt *immutableRangeTree) Len() uint64 { + return irt.number +} + +func newImmutableRangeTree(dimensions uint64) *immutableRangeTree { + return &immutableRangeTree{ + dimensions: dimensions, + } +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/interface.go b/vendor/github.com/Workiva/go-datastructures/rangetree/interface.go new file mode 100644 index 0000000000..17f1e46a35 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/interface.go @@ -0,0 +1,82 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package rangetree is designed to store n-dimensional data in an easy-to-query +way. Given this package's primary use as representing cartesian data, this +information is represented by int64s at n-dimensions. This implementation +is not actually a tree but a sparse n-dimensional list. This package also +includes two implementations of this sparse list, one mutable (and not threadsafe) +and another that is immutable copy-on-write which is threadsafe. The mutable +version is obviously faster but will likely have write contention for any +consumer that needs a threadsafe rangetree. + +TODO: unify both implementations with the same interface. +*/ +package rangetree + +// Entry defines items that can be added to the rangetree. +type Entry interface { + // ValueAtDimension returns the value of this entry + // at the specified dimension. + ValueAtDimension(dimension uint64) int64 +} + +// Interval describes the methods required to query the rangetree. Note that +// all ranges are inclusive. +type Interval interface { + // LowAtDimension returns an integer representing the lower bound + // at the requested dimension. + LowAtDimension(dimension uint64) int64 + // HighAtDimension returns an integer representing the higher bound + // at the request dimension. + HighAtDimension(dimension uint64) int64 +} + +// RangeTree describes the methods available to the rangetree. +type RangeTree interface { + // Add will add the provided entries to the tree. Any entries that + // were overwritten will be returned in the order in which they + // were overwritten. If an entry's addition does not overwrite, a nil + // is returned for that entry's index in the provided cells. + Add(entries ...Entry) Entries + // Len returns the number of entries in the tree. + Len() uint64 + // Delete will remove the provided entries from the tree. + // Any entries that were deleted will be returned in the order in + // which they were deleted. If an entry does not exist to be deleted, + // a nil is returned for that entry's index in the provided cells. + Delete(entries ...Entry) Entries + // Query will return a list of entries that fall within + // the provided interval. The values at dimensions are inclusive. + Query(interval Interval) Entries + // Apply will call the provided function with each entry that exists + // within the provided range, in order. Return false at any time to + // cancel iteration. Altering the entry in such a way that its location + // changes will result in undefined behavior. + Apply(interval Interval, fn func(Entry) bool) + // Get returns any entries that exist at the addresses provided by the + // given entries. Entries are returned in the order in which they are + // received. If an entry cannot be found, a nil is returned in its + // place. + Get(entries ...Entry) Entries + // InsertAtDimension will increment items at and above the given index + // by the number provided. Provide a negative number to to decrement. + // Returned are two lists. The first list is a list of entries that + // were moved. The second is a list entries that were deleted. These + // lists are exclusive. + InsertAtDimension(dimension uint64, index, number int64) (Entries, Entries) +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/node.go b/vendor/github.com/Workiva/go-datastructures/rangetree/node.go new file mode 100644 index 0000000000..83c933d70e --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/node.go @@ -0,0 +1,37 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +type nodes []*node + +type node struct { + value int64 + entry Entry + orderedNodes orderedNodes +} + +func newNode(value int64, entry Entry, needNextDimension bool) *node { + n := &node{} + n.value = value + if needNextDimension { + n.orderedNodes = make(orderedNodes, 0, 10) + } else { + n.entry = entry + } + + return n +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go b/vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go new file mode 100644 index 0000000000..6ec1b82f20 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go @@ -0,0 +1,241 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rangetree + +import "sort" + +// orderedNodes represents an ordered list of points living +// at the last dimension. No duplicates can be inserted here. +type orderedNodes nodes + +func (nodes orderedNodes) search(value int64) int { + return sort.Search( + len(nodes), + func(i int) bool { return nodes[i].value >= value }, + ) +} + +// addAt will add the provided node at the provided index. Returns +// a node if one was overwritten. +func (nodes *orderedNodes) addAt(i int, node *node) *node { + if i == len(*nodes) { + *nodes = append(*nodes, node) + return nil + } + + if (*nodes)[i].value == node.value { + overwritten := (*nodes)[i] + // this is a duplicate, there can't be a duplicate + // point in the last dimension + (*nodes)[i] = node + return overwritten + } + + *nodes = append(*nodes, nil) + copy((*nodes)[i+1:], (*nodes)[i:]) + (*nodes)[i] = node + return nil +} + +func (nodes *orderedNodes) add(node *node) *node { + i := nodes.search(node.value) + return nodes.addAt(i, node) +} + +func (nodes *orderedNodes) deleteAt(i int) *node { + if i >= len(*nodes) { // no matching found + return nil + } + + deleted := (*nodes)[i] + copy((*nodes)[i:], (*nodes)[i+1:]) + (*nodes)[len(*nodes)-1] = nil + *nodes = (*nodes)[:len(*nodes)-1] + return deleted +} + +func (nodes *orderedNodes) delete(value int64) *node { + i := nodes.search(value) + + if (*nodes)[i].value != value || i == len(*nodes) { + return nil + } + + return nodes.deleteAt(i) +} + +func (nodes orderedNodes) apply(low, high int64, fn func(*node) bool) bool { + index := nodes.search(low) + if index == len(nodes) { + return true + } + + for ; index < len(nodes); index++ { + if nodes[index].value > high { + break + } + + if !fn(nodes[index]) { + return false + } + } + + return true +} + +func (nodes orderedNodes) get(value int64) (*node, int) { + i := nodes.search(value) + if i == len(nodes) { + return nil, i + } + + if nodes[i].value == value { + return nodes[i], i + } + + return nil, i +} + +func (nodes *orderedNodes) getOrAdd(entry Entry, + dimension, lastDimension uint64) (*node, bool) { + + isLastDimension := isLastDimension(lastDimension, dimension) + value := entry.ValueAtDimension(dimension) + + i := nodes.search(value) + if i == len(*nodes) { + node := newNode(value, entry, !isLastDimension) + *nodes = append(*nodes, node) + return node, true + } + + if (*nodes)[i].value == value { + return (*nodes)[i], false + } + + node := newNode(value, entry, !isLastDimension) + *nodes = append(*nodes, nil) + copy((*nodes)[i+1:], (*nodes)[i:]) + (*nodes)[i] = node + return node, true +} + +func (nodes orderedNodes) flatten(entries *Entries) { + for _, node := range nodes { + if node.orderedNodes != nil { + node.orderedNodes.flatten(entries) + } else { + *entries = append(*entries, node.entry) + } + } +} + +func (nodes *orderedNodes) insert(insertDimension, dimension, maxDimension uint64, + index, number int64, modified, deleted *Entries) { + + lastDimension := isLastDimension(maxDimension, dimension) + + if insertDimension == dimension { + i := nodes.search(index) + var toDelete []int + + for j := i; j < len(*nodes); j++ { + (*nodes)[j].value += number + if (*nodes)[j].value < index { + toDelete = append(toDelete, j) + if lastDimension { + *deleted = append(*deleted, (*nodes)[j].entry) + } else { + (*nodes)[j].orderedNodes.flatten(deleted) + } + continue + } + if lastDimension { + *modified = append(*modified, (*nodes)[j].entry) + } else { + (*nodes)[j].orderedNodes.flatten(modified) + } + } + + for i, index := range toDelete { + nodes.deleteAt(index - i) + } + + return + } + + for _, node := range *nodes { + node.orderedNodes.insert( + insertDimension, dimension+1, maxDimension, + index, number, modified, deleted, + ) + } +} + +func (nodes orderedNodes) immutableInsert(insertDimension, dimension, maxDimension uint64, + index, number int64, modified, deleted *Entries) orderedNodes { + + lastDimension := isLastDimension(maxDimension, dimension) + + cp := make(orderedNodes, len(nodes)) + copy(cp, nodes) + + if insertDimension == dimension { + i := cp.search(index) + var toDelete []int + + for j := i; j < len(cp); j++ { + nn := newNode(cp[j].value+number, cp[j].entry, !lastDimension) + nn.orderedNodes = cp[j].orderedNodes + cp[j] = nn + if cp[j].value < index { + toDelete = append(toDelete, j) + if lastDimension { + *deleted = append(*deleted, cp[j].entry) + } else { + cp[j].orderedNodes.flatten(deleted) + } + continue + } + if lastDimension { + *modified = append(*modified, cp[j].entry) + } else { + cp[j].orderedNodes.flatten(modified) + } + } + + for _, index := range toDelete { + cp.deleteAt(index) + } + + return cp + } + + for i := 0; i < len(cp); i++ { + oldNode := nodes[i] + nn := newNode(oldNode.value, oldNode.entry, !lastDimension) + nn.orderedNodes = oldNode.orderedNodes.immutableInsert( + insertDimension, dimension+1, + maxDimension, + index, number, + modified, deleted, + ) + cp[i] = nn + } + + return cp +} diff --git a/vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go b/vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go new file mode 100644 index 0000000000..09a47b4484 --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go @@ -0,0 +1,263 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package rangetree + +func isLastDimension(value, test uint64) bool { + return test >= value +} + +type nodeBundle struct { + list *orderedNodes + index int +} + +type orderedTree struct { + top orderedNodes + number uint64 + dimensions uint64 + path []*nodeBundle +} + +func (ot *orderedTree) resetPath() { + ot.path = ot.path[:0] +} + +func (ot *orderedTree) needNextDimension() bool { + return ot.dimensions > 1 +} + +// add will add the provided entry to the rangetree and return an +// entry if one was overwritten. +func (ot *orderedTree) add(entry Entry) *node { + var node *node + list := &ot.top + + for i := uint64(1); i <= ot.dimensions; i++ { + if isLastDimension(ot.dimensions, i) { + overwritten := list.add( + newNode(entry.ValueAtDimension(i), entry, false), + ) + if overwritten == nil { + ot.number++ + } + return overwritten + } + node, _ = list.getOrAdd(entry, i, ot.dimensions) + list = &node.orderedNodes + } + + return nil +} + +// Add will add the provided entries to the tree. This method +// returns a list of entries that were overwritten in the order +// in which entries were received. If an entry doesn't overwrite +// anything, a nil will be returned for that entry in the returned +// slice. +func (ot *orderedTree) Add(entries ...Entry) Entries { + if len(entries) == 0 { + return nil + } + + overwrittens := make(Entries, len(entries)) + for i, entry := range entries { + if entry == nil { + continue + } + + overwritten := ot.add(entry) + if overwritten != nil { + overwrittens[i] = overwritten.entry + } + } + + return overwrittens +} + +func (ot *orderedTree) delete(entry Entry) *node { + ot.resetPath() + var index int + var node *node + list := &ot.top + + for i := uint64(1); i <= ot.dimensions; i++ { + value := entry.ValueAtDimension(i) + node, index = list.get(value) + if node == nil { // there's nothing to delete + return nil + } + + nb := &nodeBundle{list: list, index: index} + ot.path = append(ot.path, nb) + + list = &node.orderedNodes + } + + ot.number-- + + for i := len(ot.path) - 1; i >= 0; i-- { + nb := ot.path[i] + nb.list.deleteAt(nb.index) + if len(*nb.list) > 0 { + break + } + } + + return node +} + +func (ot *orderedTree) get(entry Entry) Entry { + on := ot.top + for i := uint64(1); i <= ot.dimensions; i++ { + n, _ := on.get(entry.ValueAtDimension(i)) + if n == nil { + return nil + } + if i == ot.dimensions { + return n.entry + } + on = n.orderedNodes + } + + return nil +} + +// Get returns any entries that exist at the addresses provided by the +// given entries. Entries are returned in the order in which they are +// received. If an entry cannot be found, a nil is returned in its +// place. +func (ot *orderedTree) Get(entries ...Entry) Entries { + result := make(Entries, 0, len(entries)) + for _, entry := range entries { + result = append(result, ot.get(entry)) + } + + return result +} + +// Delete will remove the provided entries from the tree. +// Any entries that were deleted will be returned in the order in +// which they were deleted. If an entry does not exist to be deleted, +// a nil is returned for that entry's index in the provided cells. +func (ot *orderedTree) Delete(entries ...Entry) Entries { + if len(entries) == 0 { + return nil + } + + deletedEntries := make(Entries, len(entries)) + for i, entry := range entries { + if entry == nil { + continue + } + + deleted := ot.delete(entry) + if deleted != nil { + deletedEntries[i] = deleted.entry + } + } + + return deletedEntries +} + +// Len returns the number of items in the tree. +func (ot *orderedTree) Len() uint64 { + return ot.number +} + +func (ot *orderedTree) apply(list orderedNodes, interval Interval, + dimension uint64, fn func(*node) bool) bool { + + low, high := interval.LowAtDimension(dimension), interval.HighAtDimension(dimension) + + if isLastDimension(ot.dimensions, dimension) { + if !list.apply(low, high, fn) { + return false + } + } else { + if !list.apply(low, high, func(n *node) bool { + if !ot.apply(n.orderedNodes, interval, dimension+1, fn) { + return false + } + return true + }) { + return false + } + return true + } + + return true +} + +// Apply will call (in order) the provided function to every +// entry that falls within the provided interval. Any alteration +// the the entry that would result in different answers to the +// interface methods results in undefined behavior. +func (ot *orderedTree) Apply(interval Interval, fn func(Entry) bool) { + ot.apply(ot.top, interval, 1, func(n *node) bool { + return fn(n.entry) + }) +} + +// Query will return an ordered list of results in the given +// interval. +func (ot *orderedTree) Query(interval Interval) Entries { + entries := NewEntries() + + ot.apply(ot.top, interval, 1, func(n *node) bool { + entries = append(entries, n.entry) + return true + }) + + return entries +} + +// InsertAtDimension will increment items at and above the given index +// by the number provided. Provide a negative number to to decrement. +// Returned are two lists. The first list is a list of entries that +// were moved. The second is a list entries that were deleted. These +// lists are exclusive. +func (ot *orderedTree) InsertAtDimension(dimension uint64, + index, number int64) (Entries, Entries) { + + // TODO: perhaps return an error here? + if dimension > ot.dimensions || number == 0 { + return nil, nil + } + + modified := make(Entries, 0, 100) + deleted := make(Entries, 0, 100) + + ot.top.insert(dimension, 1, ot.dimensions, + index, number, &modified, &deleted, + ) + + ot.number -= uint64(len(deleted)) + + return modified, deleted +} + +func newOrderedTree(dimensions uint64) *orderedTree { + return &orderedTree{ + dimensions: dimensions, + path: make([]*nodeBundle, 0, dimensions), + } +} + +// New is the constructor to create a new rangetree with +// the provided number of dimensions. +func New(dimensions uint64) RangeTree { + return newOrderedTree(dimensions) +} diff --git a/vendor/github.com/Workiva/go-datastructures/slice/int64.go b/vendor/github.com/Workiva/go-datastructures/slice/int64.go new file mode 100644 index 0000000000..b6526db82c --- /dev/null +++ b/vendor/github.com/Workiva/go-datastructures/slice/int64.go @@ -0,0 +1,91 @@ +/* +Copyright 2014 Workiva, LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package Int64 simply adds an Int64-typed version of the standard library's +sort/IntSlice implementation. + +Also added is an Insert method. +*/ +package slice + +import "sort" + +// Int64Slice is a slice that fulfills the sort.Interface interface. +type Int64Slice []int64 + +// Len returns the len of this slice. Required by sort.Interface. +func (s Int64Slice) Len() int { + return len(s) +} + +// Less returns a bool indicating if the value at position i +// is less than at position j. Required by sort.Interface. +func (s Int64Slice) Less(i, j int) bool { + return s[i] < s[j] +} + +// Search will search this slice and return an index that corresponds +// to the lowest position of that value. You'll need to check +// separately if the value at that position is equal to x. The +// behavior of this method is undefinited if the slice is not sorted. +func (s Int64Slice) Search(x int64) int { + return sort.Search(len(s), func(i int) bool { + return s[i] >= x + }) +} + +// Sort will in-place sort this list of int64s. +func (s Int64Slice) Sort() { + sort.Sort(s) +} + +// Swap will swap the elements at positions i and j. This is required +// by sort.Interface. +func (s Int64Slice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Exists returns a bool indicating if the provided value exists +// in this list. This has undefined behavior if the list is not +// sorted. +func (s Int64Slice) Exists(x int64) bool { + i := s.Search(x) + if i == len(s) { + return false + } + + return s[i] == x +} + +// Insert will insert x into the sorted position in this list +// and return a list with the value added. If this slice has not +// been sorted Insert's behavior is undefined. +func (s Int64Slice) Insert(x int64) Int64Slice { + i := s.Search(x) + if i == len(s) { + return append(s, x) + } + + if s[i] == x { + return s + } + + s = append(s, 0) + copy(s[i+1:], s[i:]) + s[i] = x + return s +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 748861ff7c..9122692cde 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -68,6 +68,10 @@ github.com/NYTimes/gziphandler github.com/PuerkitoBio/purell # github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 github.com/PuerkitoBio/urlesc +# github.com/Workiva/go-datastructures v1.0.53 +## explicit +github.com/Workiva/go-datastructures/rangetree +github.com/Workiva/go-datastructures/slice # github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 github.com/alecthomas/template github.com/alecthomas/template/parse