Unordered head block (#3957)

* speccing out unordered head block

* testware & unordered serialise

* common utils for iter & sampleIter

* more generic forEntries

* more efficient unordedHeadChunk serialise (no internal re-casting)

* roundtripping unordered head block, exit headchunk iteration early, add constant for current default chunk version

* adds head block write benchmarks for ordered & unordered writes

* fixes bench

* removes unused initializer

* gofmt

* linting
pull/3976/head
Owen Diehl 4 years ago committed by GitHub
parent 6d8939670e
commit 23ad7f2e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      go.mod
  2. 6
      go.sum
  3. 6
      pkg/chunkenc/memchunk.go
  4. 7
      pkg/chunkenc/memchunk_test.go
  5. 416
      pkg/chunkenc/unordered.go
  6. 315
      pkg/chunkenc/unordered_test.go
  7. 202
      vendor/github.com/Workiva/go-datastructures/LICENSE
  8. 45
      vendor/github.com/Workiva/go-datastructures/rangetree/entries.go
  9. 40
      vendor/github.com/Workiva/go-datastructures/rangetree/error.go
  10. 275
      vendor/github.com/Workiva/go-datastructures/rangetree/immutable.go
  11. 82
      vendor/github.com/Workiva/go-datastructures/rangetree/interface.go
  12. 37
      vendor/github.com/Workiva/go-datastructures/rangetree/node.go
  13. 241
      vendor/github.com/Workiva/go-datastructures/rangetree/ordered.go
  14. 263
      vendor/github.com/Workiva/go-datastructures/rangetree/orderedtree.go
  15. 91
      vendor/github.com/Workiva/go-datastructures/slice/int64.go
  16. 4
      vendor/modules.txt

@ -6,6 +6,7 @@ require (
cloud.google.com/go/pubsub v1.3.1
github.com/Masterminds/sprig/v3 v3.2.2
github.com/NYTimes/gziphandler v1.1.1
github.com/Workiva/go-datastructures v1.0.53
github.com/aws/aws-lambda-go v1.17.0
github.com/bmatcuk/doublestar v1.2.2
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee

@ -172,6 +172,8 @@ github.com/Shopify/sarama v1.27.1/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig=
github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A=
github.com/abdullin/seq v0.0.0-20160510034733-d5467c17e7af/go.mod h1:5Jv4cbFiHJMsVxt52+i0Ha45fjshj6wxYr1r19tB9bw=
github.com/aerospike/aerospike-client-go v1.27.0/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
@ -1344,6 +1346,7 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
@ -1608,12 +1611,14 @@ github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1C
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 h1:j6JEOq5QWFker+d7mFQYOhjTZonQ7YkLTHm56dbn+yM=
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448 h1:hbyjqt5UnyKeOT3rFVxLxi7iTI6XqR2p4TkwEAQdUiw=
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448/go.mod h1:Q5IRRDY+cjIaiOjTAnXN5LKQV5MPqVx5ofQn85Jy5Yw=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
@ -2147,6 +2152,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201014170642-d1624618ad65/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201020161133-226fd2f889ca/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201119054027-25dc3e1ccc3c/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=

@ -32,6 +32,8 @@ const (
chunkFormatV2
chunkFormatV3
DefaultChunkFormat = chunkFormatV3 // the currently used chunk format
blocksPerChunk = 10
maxLineLength = 1024 * 1024 * 1024
@ -277,7 +279,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
blocks: []block{},
head: &headBlock{},
format: chunkFormatV3,
format: DefaultChunkFormat,
encoding: enc,
}
@ -350,7 +352,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
// Verify checksums.
expCRC := binary.BigEndian.Uint32(b[blk.offset+l:])
if expCRC != crc32.Checksum(blk.b, castagnoliTable) {
level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum)
_ = level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum)
continue
}

@ -40,10 +40,9 @@ var testEncoding = []Encoding{
}
var (
testBlockSize = 256 * 1024
testTargetSize = 1500 * 1024
noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
countExtractor = func() log.StreamSampleExtractor {
testBlockSize = 256 * 1024
testTargetSize = 1500 * 1024
countExtractor = func() log.StreamSampleExtractor {
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
panic(err)

@ -0,0 +1,416 @@
package chunkenc
import (
"bytes"
"context"
"encoding/binary"
"io"
"math"
"sort"
"time"
"github.com/Workiva/go-datastructures/rangetree"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
)
var (
noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
)
type unorderedHeadBlock struct {
// Opted for range tree over skiplist for space reduction.
// Inserts: O(log(n))
// Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries
rt rangetree.RangeTree
lines int // number of entries
size int // size of uncompressed bytes.
mint, maxt int64 // upper and lower bounds
}
func newUnorderedHeadBlock() *unorderedHeadBlock {
return &unorderedHeadBlock{
rt: rangetree.New(1),
}
}
func (hb *unorderedHeadBlock) isEmpty() bool {
return hb.size == 0
}
// collection of entries belonging to the same nanosecond
type nsEntries struct {
ts int64
entries []string
}
func (e *nsEntries) ValueAtDimension(_ uint64) int64 {
return e.ts
}
func (hb *unorderedHeadBlock) append(ts int64, line string) {
// This is an allocation hack. The rangetree lib does not
// support the ability to pass a "mutate" function during an insert
// and instead will displace any existing entry at the specified timestamp.
// Since Loki supports multiple lines per timestamp,
// we insert an entry without any log lines,
// which is ordered by timestamp alone.
// Then, we detect if we've displaced any existing entries, and
// append the new one to the existing, preallocated slice.
// If not, we create a slice with one entry.
e := &nsEntries{
ts: ts,
}
displaced := hb.rt.Add(e)
if displaced[0] != nil {
e.entries = append(displaced[0].(*nsEntries).entries, line)
} else {
e.entries = []string{line}
}
// Update hb metdata
if hb.size == 0 || hb.mint > ts {
hb.mint = ts
}
if hb.maxt < ts {
hb.maxt = ts
}
hb.size += len(line)
hb.lines++
}
// Implements rangetree.Interval
type interval struct {
mint, maxt int64
}
func (i interval) LowAtDimension(_ uint64) int64 { return i.mint }
// rangetree library treats this as inclusive, but we want exclusivity,
// or [from, through) in nanoseconds
func (i interval) HighAtDimension(_ uint64) int64 { return i.maxt - 1 }
// helper for base logic across {Entry,Sample}Iterator
func (hb *unorderedHeadBlock) forEntries(
ctx context.Context,
direction logproto.Direction,
mint,
maxt int64,
entryFn func(int64, string) error, // returning an error exits early
) (err error) {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return
}
entries := hb.rt.Query(interval{
mint: mint,
maxt: maxt,
})
chunkStats := stats.GetChunkData(ctx)
process := func(es *nsEntries) {
chunkStats.HeadChunkLines += int64(len(es.entries))
// preserve write ordering of entries with the same ts
var i int
if direction == logproto.BACKWARD {
i = len(es.entries) - 1
}
next := func() {
if direction == logproto.FORWARD {
i++
} else {
i--
}
}
for ; i < len(es.entries) && i >= 0; next() {
line := es.entries[i]
chunkStats.HeadChunkBytes += int64(len(line))
err = entryFn(es.ts, line)
}
}
if direction == logproto.FORWARD {
for _, e := range entries {
process(e.(*nsEntries))
if err != nil {
return err
}
}
} else {
for i := len(entries) - 1; i >= 0; i-- {
process(entries[i].(*nsEntries))
if err != nil {
return err
}
}
}
return nil
}
func (hb *unorderedHeadBlock) iterator(
ctx context.Context,
direction logproto.Direction,
mint,
maxt int64,
pipeline log.StreamPipeline,
) iter.EntryIterator {
// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.
streams := map[uint64]*logproto.Stream{}
_ = hb.forEntries(
ctx,
direction,
mint,
maxt,
func(ts int64, line string) error {
newLine, parsedLbs, ok := pipeline.ProcessString(line)
if !ok {
return nil
}
var stream *logproto.Stream
lhash := parsedLbs.Hash()
if stream, ok = streams[lhash]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
}
streams[lhash] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: newLine,
})
return nil
},
)
if len(streams) == 0 {
return iter.NoopIterator
}
streamsResult := make([]logproto.Stream, 0, len(streams))
for _, stream := range streams {
streamsResult = append(streamsResult, *stream)
}
return iter.NewStreamsIterator(ctx, streamsResult, direction)
}
// nolint:unused
func (hb *unorderedHeadBlock) sampleIterator(
ctx context.Context,
mint,
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator {
series := map[uint64]*logproto.Series{}
_ = hb.forEntries(
ctx,
logproto.FORWARD,
mint,
maxt,
func(ts int64, line string) error {
value, parsedLabels, ok := extractor.ProcessString(line)
if !ok {
return nil
}
var found bool
var s *logproto.Series
lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found {
s = &logproto.Series{
Labels: parsedLabels.String(),
}
series[lhash] = s
}
// []byte here doesn't create allocation because Sum64 has go:noescape directive
// It specifies that the function does not allow any of the pointers passed as arguments
// to escape into the heap or into the values returned from the function.
h := xxhash.Sum64([]byte(line))
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: ts,
Value: value,
Hash: h,
})
return nil
},
)
if len(series) == 0 {
return iter.NoopIterator
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
// todo(ctovena) not sure we need this sort.
sort.Sort(s)
seriesRes = append(seriesRes, *s)
}
return iter.NewMultiSeriesIterator(ctx, seriesRes)
}
// nolint:unused
// serialise is used in creating an ordered, compressed block from an unorderedHeadBlock
func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
serializeBytesBufferPool.Put(inBuf)
}()
outBuf := &bytes.Buffer{}
encBuf := make([]byte, binary.MaxVarintLen64)
compressedWriter := pool.GetWriter(outBuf)
defer pool.PutWriter(compressedWriter)
_ = hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
n := binary.PutVarint(encBuf, ts)
inBuf.Write(encBuf[:n])
n = binary.PutUvarint(encBuf, uint64(len(line)))
inBuf.Write(encBuf[:n])
inBuf.WriteString(line)
return nil
},
)
if _, err := compressedWriter.Write(inBuf.Bytes()); err != nil {
return nil, errors.Wrap(err, "appending entry")
}
if err := compressedWriter.Close(); err != nil {
return nil, errors.Wrap(err, "flushing pending compress buffer")
}
return outBuf.Bytes(), nil
}
// CheckpointSize returns the estimated size of the headblock checkpoint.
func (hb *unorderedHeadBlock) CheckpointSize(version byte) int {
size := 1 // version
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line.
size += hb.size // uncompressed bytes of lines
return size
}
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *unorderedHeadBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
err := hb.CheckpointTo(version, buf)
return buf.Bytes(), err
}
// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`.
func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error {
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)
eb.reset()
eb.putByte(version)
_, err := w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock version")
}
eb.reset()
eb.putUvarint(hb.lines)
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock metas")
}
eb.reset()
err = hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
eb.putVarint64(ts)
eb.putUvarint(len(line))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry ts")
}
eb.reset()
_, err := io.WriteString(w, line)
if err != nil {
return errors.Wrap(err, "write headblock entry line")
}
return nil
},
)
return nil
}
func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error {
// ensure it's empty
*hb = *newUnorderedHeadBlock()
if len(b) < 1 {
return nil
}
db := decbuf{b: b}
version := db.byte()
if db.err() != nil {
return errors.Wrap(db.err(), "verifying headblock header")
}
switch version {
case chunkFormatV1, chunkFormatV2, chunkFormatV3:
default:
return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version)
}
n := db.uvarint()
if err := db.err(); err != nil {
return errors.Wrap(err, "verifying headblock metadata")
}
for i := 0; i < n && db.err() == nil; i++ {
ts := db.varint64()
lineLn := db.uvarint()
line := string(db.bytes(lineLn))
hb.append(ts, line)
}
if err := db.err(); err != nil {
return errors.Wrap(err, "decoding entries")
}
return nil
}

@ -0,0 +1,315 @@
package chunkenc
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) {
var i int
for got.Next() {
require.Equal(t, logproto.Entry{
Timestamp: time.Unix(0, exp[i].t),
Line: exp[i].s,
}, got.Entry())
i++
}
require.Equal(t, i, len(exp))
}
func Test_forEntriesEarlyReturn(t *testing.T) {
hb := newUnorderedHeadBlock()
for i := 0; i < 10; i++ {
hb.append(int64(i), fmt.Sprint(i))
}
// forward
var forwardCt int
var forwardStop int64
err := hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
forwardCt++
forwardStop = ts
if ts == 5 {
return errors.New("err")
}
return nil
},
)
require.Error(t, err)
require.Equal(t, int64(5), forwardStop)
require.Equal(t, 6, forwardCt)
// backward
var backwardCt int
var backwardStop int64
err = hb.forEntries(
context.Background(),
logproto.BACKWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
backwardCt++
backwardStop = ts
if ts == 5 {
return errors.New("err")
}
return nil
},
)
require.Error(t, err)
require.Equal(t, int64(5), backwardStop)
require.Equal(t, 5, backwardCt)
}
func Test_Unordered_InsertRetrieval(t *testing.T) {
for _, tc := range []struct {
desc string
input, exp []entry
dir logproto.Direction
}{
{
desc: "simple forward",
input: []entry{
{0, "a"}, {1, "b"}, {2, "c"},
},
exp: []entry{
{0, "a"}, {1, "b"}, {2, "c"},
},
},
{
desc: "simple backward",
input: []entry{
{0, "a"}, {1, "b"}, {2, "c"},
},
exp: []entry{
{2, "c"}, {1, "b"}, {0, "a"},
},
dir: logproto.BACKWARD,
},
{
desc: "unordered forward",
input: []entry{
{1, "b"}, {0, "a"}, {2, "c"},
},
exp: []entry{
{0, "a"}, {1, "b"}, {2, "c"},
},
},
{
desc: "unordered backward",
input: []entry{
{1, "b"}, {0, "a"}, {2, "c"},
},
exp: []entry{
{2, "c"}, {1, "b"}, {0, "a"},
},
dir: logproto.BACKWARD,
},
{
desc: "ts collision forward",
input: []entry{
{0, "a"}, {0, "b"}, {1, "c"},
},
exp: []entry{
{0, "a"}, {0, "b"}, {1, "c"},
},
},
{
desc: "ts collision backward",
input: []entry{
{0, "a"}, {0, "b"}, {1, "c"},
},
exp: []entry{
{1, "c"}, {0, "b"}, {0, "a"},
},
dir: logproto.BACKWARD,
},
} {
t.Run(tc.desc, func(t *testing.T) {
hb := newUnorderedHeadBlock()
for _, e := range tc.input {
hb.append(e.t, e.s)
}
itr := hb.iterator(
context.Background(),
tc.dir,
0,
math.MaxInt64,
noopStreamPipeline,
)
iterEq(t, tc.exp, itr)
})
}
}
func Test_UnorderedBoundedIter(t *testing.T) {
for _, tc := range []struct {
desc string
mint, maxt int64
dir logproto.Direction
input []entry
exp []entry
}{
{
desc: "simple",
mint: 1,
maxt: 4,
input: []entry{
{0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"},
},
exp: []entry{
{1, "b"}, {2, "c"}, {3, "d"},
},
},
{
desc: "simple backward",
mint: 1,
maxt: 4,
input: []entry{
{0, "a"}, {1, "b"}, {2, "c"}, {3, "d"}, {4, "e"},
},
exp: []entry{
{3, "d"}, {2, "c"}, {1, "b"},
},
dir: logproto.BACKWARD,
},
{
desc: "unordered",
mint: 1,
maxt: 4,
input: []entry{
{0, "a"}, {2, "c"}, {1, "b"}, {4, "e"}, {3, "d"},
},
exp: []entry{
{1, "b"}, {2, "c"}, {3, "d"},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
hb := newUnorderedHeadBlock()
for _, e := range tc.input {
hb.append(e.t, e.s)
}
itr := hb.iterator(
context.Background(),
tc.dir,
tc.mint,
tc.maxt,
noopStreamPipeline,
)
iterEq(t, tc.exp, itr)
})
}
}
func Test_UnorderedHeadBlockCheckpointRoundtrip(t *testing.T) {
hb := newUnorderedHeadBlock()
for i := 0; i < 100; i++ {
hb.append(int64(i), fmt.Sprint(i))
}
// turn to bytes
b, err := hb.CheckpointBytes(DefaultChunkFormat, nil)
require.Nil(t, err)
// restore a copy from bytes
cpy := newUnorderedHeadBlock()
require.Nil(t, cpy.FromCheckpoint(b))
// ensure copy's bytes match original
cpyBytes, err := cpy.CheckpointBytes(DefaultChunkFormat, nil)
require.Nil(t, err)
require.Equal(t, b, cpyBytes)
}
func BenchmarkHeadBlockWrites(b *testing.B) {
// ordered, ordered
// unordered, ordered
// unordered, unordered
// current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block
var nWrites = (256 << 10) / 50
headBlockFn := func() func(int64, string) {
hb := &headBlock{}
return func(ts int64, line string) {
_ = hb.append(ts, line)
}
}
unorderedHeadBlockFn := func() func(int64, string) {
hb := newUnorderedHeadBlock()
return func(ts int64, line string) {
hb.append(ts, line)
}
}
for _, tc := range []struct {
desc string
fn func() func(int64, string)
unorderedWrites bool
}{
{
desc: "ordered headblock ordered writes",
fn: headBlockFn,
},
{
desc: "unordered headblock ordered writes",
fn: unorderedHeadBlockFn,
},
{
desc: "unordered headblock unordered writes",
fn: unorderedHeadBlockFn,
unorderedWrites: true,
},
} {
// build writes before we start benchmarking so random number generation, etc,
// isn't included in our timing info
writes := make([]entry, 0, nWrites)
rnd := rand.NewSource(0)
for i := 0; i < nWrites; i++ {
if tc.unorderedWrites {
ts := rnd.Int63()
writes = append(writes, entry{
t: ts,
s: fmt.Sprint("line:", ts),
})
} else {
writes = append(writes, entry{
t: int64(i),
s: fmt.Sprint("line:", i),
})
}
}
b.Run(tc.desc, func(b *testing.B) {
for n := 0; n < b.N; n++ {
writeFn := tc.fn()
for _, w := range writes {
writeFn(w.t, w.s)
}
}
})
}
}

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@ -0,0 +1,45 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rangetree
import "sync"
var entriesPool = sync.Pool{
New: func() interface{} {
return make(Entries, 0, 10)
},
}
// Entries is a typed list of Entry that can be reused if Dispose
// is called.
type Entries []Entry
// Dispose will free the resources consumed by this list and
// allow the list to be reused.
func (entries *Entries) Dispose() {
for i := 0; i < len(*entries); i++ {
(*entries)[i] = nil
}
*entries = (*entries)[:0]
entriesPool.Put(*entries)
}
// NewEntries will return a reused list of entries.
func NewEntries() Entries {
return entriesPool.Get().(Entries)
}

@ -0,0 +1,40 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rangetree
import "fmt"
// NoEntriesError is returned from an operation that requires
// existing entries when none are found.
type NoEntriesError struct{}
func (nee NoEntriesError) Error() string {
return `No entries in this tree.`
}
// OutOfDimensionError is returned when a requested operation
// doesn't meet dimensional requirements.
type OutOfDimensionError struct {
provided, max uint64
}
func (oode OutOfDimensionError) Error() string {
return fmt.Sprintf(`Provided dimension: %d is
greater than max dimension: %d`,
oode.provided, oode.max,
)
}

@ -0,0 +1,275 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rangetree
import "github.com/Workiva/go-datastructures/slice"
type immutableRangeTree struct {
number uint64
top orderedNodes
dimensions uint64
}
func newCache(dimensions uint64) []slice.Int64Slice {
cache := make([]slice.Int64Slice, 0, dimensions-1)
for i := uint64(0); i < dimensions; i++ {
cache = append(cache, slice.Int64Slice{})
}
return cache
}
func (irt *immutableRangeTree) needNextDimension() bool {
return irt.dimensions > 1
}
func (irt *immutableRangeTree) add(nodes *orderedNodes, cache []slice.Int64Slice, entry Entry, added *uint64) {
var node *node
list := nodes
for i := uint64(1); i <= irt.dimensions; i++ {
if isLastDimension(irt.dimensions, i) {
if i != 1 && !cache[i-1].Exists(node.value) {
nodes := make(orderedNodes, len(*list))
copy(nodes, *list)
list = &nodes
cache[i-1].Insert(node.value)
}
newNode := newNode(entry.ValueAtDimension(i), entry, false)
overwritten := list.add(newNode)
if overwritten == nil {
*added++
}
if node != nil {
node.orderedNodes = *list
}
break
}
if i != 1 && !cache[i-1].Exists(node.value) {
nodes := make(orderedNodes, len(*list))
copy(nodes, *list)
list = &nodes
cache[i-1].Insert(node.value)
node.orderedNodes = *list
}
node, _ = list.getOrAdd(entry, i, irt.dimensions)
list = &node.orderedNodes
}
}
// Add will add the provided entries into the tree and return
// a new tree with those entries added.
func (irt *immutableRangeTree) Add(entries ...Entry) *immutableRangeTree {
if len(entries) == 0 {
return irt
}
cache := newCache(irt.dimensions)
top := make(orderedNodes, len(irt.top))
copy(top, irt.top)
added := uint64(0)
for _, entry := range entries {
irt.add(&top, cache, entry, &added)
}
tree := newImmutableRangeTree(irt.dimensions)
tree.top = top
tree.number = irt.number + added
return tree
}
// InsertAtDimension will increment items at and above the given index
// by the number provided. Provide a negative number to to decrement.
// Returned are two lists and the modified tree. The first list is a
// list of entries that were moved. The second is a list entries that
// were deleted. These lists are exclusive.
func (irt *immutableRangeTree) InsertAtDimension(dimension uint64,
index, number int64) (*immutableRangeTree, Entries, Entries) {
if dimension > irt.dimensions || number == 0 {
return irt, nil, nil
}
modified, deleted := make(Entries, 0, 100), make(Entries, 0, 100)
tree := newImmutableRangeTree(irt.dimensions)
tree.top = irt.top.immutableInsert(
dimension, 1, irt.dimensions,
index, number,
&modified, &deleted,
)
tree.number = irt.number - uint64(len(deleted))
return tree, modified, deleted
}
type immutableNodeBundle struct {
list *orderedNodes
index int
previousNode *node
newNode *node
}
func (irt *immutableRangeTree) Delete(entries ...Entry) *immutableRangeTree {
cache := newCache(irt.dimensions)
top := make(orderedNodes, len(irt.top))
copy(top, irt.top)
deleted := uint64(0)
for _, entry := range entries {
irt.delete(&top, cache, entry, &deleted)
}
tree := newImmutableRangeTree(irt.dimensions)
tree.top = top
tree.number = irt.number - deleted
return tree
}
func (irt *immutableRangeTree) delete(top *orderedNodes,
cache []slice.Int64Slice, entry Entry, deleted *uint64) {
path := make([]*immutableNodeBundle, 0, 5)
var index int
var n *node
var local *node
list := top
for i := uint64(1); i <= irt.dimensions; i++ {
value := entry.ValueAtDimension(i)
local, index = list.get(value)
if local == nil { // there's nothing to delete
return
}
nb := &immutableNodeBundle{
list: list,
index: index,
previousNode: n,
}
path = append(path, nb)
n = local
list = &n.orderedNodes
}
*deleted++
for i := len(path) - 1; i >= 0; i-- {
nb := path[i]
if nb.previousNode != nil {
nodes := make(orderedNodes, len(*nb.list))
copy(nodes, *nb.list)
nb.list = &nodes
if len(*nb.list) == 1 {
continue
}
nn := newNode(
nb.previousNode.value,
nb.previousNode.entry,
!isLastDimension(irt.dimensions, uint64(i)+1),
)
nn.orderedNodes = nodes
path[i-1].newNode = nn
}
}
for _, nb := range path {
if nb.newNode == nil {
nb.list.deleteAt(nb.index)
} else {
(*nb.list)[nb.index] = nb.newNode
}
}
}
func (irt *immutableRangeTree) apply(list orderedNodes, interval Interval,
dimension uint64, fn func(*node) bool) bool {
low, high := interval.LowAtDimension(dimension), interval.HighAtDimension(dimension)
if isLastDimension(irt.dimensions, dimension) {
if !list.apply(low, high, fn) {
return false
}
} else {
if !list.apply(low, high, func(n *node) bool {
if !irt.apply(n.orderedNodes, interval, dimension+1, fn) {
return false
}
return true
}) {
return false
}
return true
}
return true
}
// Query will return an ordered list of results in the given
// interval.
func (irt *immutableRangeTree) Query(interval Interval) Entries {
entries := NewEntries()
irt.apply(irt.top, interval, 1, func(n *node) bool {
entries = append(entries, n.entry)
return true
})
return entries
}
func (irt *immutableRangeTree) get(entry Entry) Entry {
on := irt.top
for i := uint64(1); i <= irt.dimensions; i++ {
n, _ := on.get(entry.ValueAtDimension(i))
if n == nil {
return nil
}
if i == irt.dimensions {
return n.entry
}
on = n.orderedNodes
}
return nil
}
// Get returns any entries that exist at the addresses provided by the
// given entries. Entries are returned in the order in which they are
// received. If an entry cannot be found, a nil is returned in its
// place.
func (irt *immutableRangeTree) Get(entries ...Entry) Entries {
result := make(Entries, 0, len(entries))
for _, entry := range entries {
result = append(result, irt.get(entry))
}
return result
}
// Len returns the number of items in this tree.
func (irt *immutableRangeTree) Len() uint64 {
return irt.number
}
func newImmutableRangeTree(dimensions uint64) *immutableRangeTree {
return &immutableRangeTree{
dimensions: dimensions,
}
}

@ -0,0 +1,82 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package rangetree is designed to store n-dimensional data in an easy-to-query
way. Given this package's primary use as representing cartesian data, this
information is represented by int64s at n-dimensions. This implementation
is not actually a tree but a sparse n-dimensional list. This package also
includes two implementations of this sparse list, one mutable (and not threadsafe)
and another that is immutable copy-on-write which is threadsafe. The mutable
version is obviously faster but will likely have write contention for any
consumer that needs a threadsafe rangetree.
TODO: unify both implementations with the same interface.
*/
package rangetree
// Entry defines items that can be added to the rangetree.
type Entry interface {
// ValueAtDimension returns the value of this entry
// at the specified dimension.
ValueAtDimension(dimension uint64) int64
}
// Interval describes the methods required to query the rangetree. Note that
// all ranges are inclusive.
type Interval interface {
// LowAtDimension returns an integer representing the lower bound
// at the requested dimension.
LowAtDimension(dimension uint64) int64
// HighAtDimension returns an integer representing the higher bound
// at the request dimension.
HighAtDimension(dimension uint64) int64
}
// RangeTree describes the methods available to the rangetree.
type RangeTree interface {
// Add will add the provided entries to the tree. Any entries that
// were overwritten will be returned in the order in which they
// were overwritten. If an entry's addition does not overwrite, a nil
// is returned for that entry's index in the provided cells.
Add(entries ...Entry) Entries
// Len returns the number of entries in the tree.
Len() uint64
// Delete will remove the provided entries from the tree.
// Any entries that were deleted will be returned in the order in
// which they were deleted. If an entry does not exist to be deleted,
// a nil is returned for that entry's index in the provided cells.
Delete(entries ...Entry) Entries
// Query will return a list of entries that fall within
// the provided interval. The values at dimensions are inclusive.
Query(interval Interval) Entries
// Apply will call the provided function with each entry that exists
// within the provided range, in order. Return false at any time to
// cancel iteration. Altering the entry in such a way that its location
// changes will result in undefined behavior.
Apply(interval Interval, fn func(Entry) bool)
// Get returns any entries that exist at the addresses provided by the
// given entries. Entries are returned in the order in which they are
// received. If an entry cannot be found, a nil is returned in its
// place.
Get(entries ...Entry) Entries
// InsertAtDimension will increment items at and above the given index
// by the number provided. Provide a negative number to to decrement.
// Returned are two lists. The first list is a list of entries that
// were moved. The second is a list entries that were deleted. These
// lists are exclusive.
InsertAtDimension(dimension uint64, index, number int64) (Entries, Entries)
}

@ -0,0 +1,37 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rangetree
type nodes []*node
type node struct {
value int64
entry Entry
orderedNodes orderedNodes
}
func newNode(value int64, entry Entry, needNextDimension bool) *node {
n := &node{}
n.value = value
if needNextDimension {
n.orderedNodes = make(orderedNodes, 0, 10)
} else {
n.entry = entry
}
return n
}

@ -0,0 +1,241 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rangetree
import "sort"
// orderedNodes represents an ordered list of points living
// at the last dimension. No duplicates can be inserted here.
type orderedNodes nodes
func (nodes orderedNodes) search(value int64) int {
return sort.Search(
len(nodes),
func(i int) bool { return nodes[i].value >= value },
)
}
// addAt will add the provided node at the provided index. Returns
// a node if one was overwritten.
func (nodes *orderedNodes) addAt(i int, node *node) *node {
if i == len(*nodes) {
*nodes = append(*nodes, node)
return nil
}
if (*nodes)[i].value == node.value {
overwritten := (*nodes)[i]
// this is a duplicate, there can't be a duplicate
// point in the last dimension
(*nodes)[i] = node
return overwritten
}
*nodes = append(*nodes, nil)
copy((*nodes)[i+1:], (*nodes)[i:])
(*nodes)[i] = node
return nil
}
func (nodes *orderedNodes) add(node *node) *node {
i := nodes.search(node.value)
return nodes.addAt(i, node)
}
func (nodes *orderedNodes) deleteAt(i int) *node {
if i >= len(*nodes) { // no matching found
return nil
}
deleted := (*nodes)[i]
copy((*nodes)[i:], (*nodes)[i+1:])
(*nodes)[len(*nodes)-1] = nil
*nodes = (*nodes)[:len(*nodes)-1]
return deleted
}
func (nodes *orderedNodes) delete(value int64) *node {
i := nodes.search(value)
if (*nodes)[i].value != value || i == len(*nodes) {
return nil
}
return nodes.deleteAt(i)
}
func (nodes orderedNodes) apply(low, high int64, fn func(*node) bool) bool {
index := nodes.search(low)
if index == len(nodes) {
return true
}
for ; index < len(nodes); index++ {
if nodes[index].value > high {
break
}
if !fn(nodes[index]) {
return false
}
}
return true
}
func (nodes orderedNodes) get(value int64) (*node, int) {
i := nodes.search(value)
if i == len(nodes) {
return nil, i
}
if nodes[i].value == value {
return nodes[i], i
}
return nil, i
}
func (nodes *orderedNodes) getOrAdd(entry Entry,
dimension, lastDimension uint64) (*node, bool) {
isLastDimension := isLastDimension(lastDimension, dimension)
value := entry.ValueAtDimension(dimension)
i := nodes.search(value)
if i == len(*nodes) {
node := newNode(value, entry, !isLastDimension)
*nodes = append(*nodes, node)
return node, true
}
if (*nodes)[i].value == value {
return (*nodes)[i], false
}
node := newNode(value, entry, !isLastDimension)
*nodes = append(*nodes, nil)
copy((*nodes)[i+1:], (*nodes)[i:])
(*nodes)[i] = node
return node, true
}
func (nodes orderedNodes) flatten(entries *Entries) {
for _, node := range nodes {
if node.orderedNodes != nil {
node.orderedNodes.flatten(entries)
} else {
*entries = append(*entries, node.entry)
}
}
}
func (nodes *orderedNodes) insert(insertDimension, dimension, maxDimension uint64,
index, number int64, modified, deleted *Entries) {
lastDimension := isLastDimension(maxDimension, dimension)
if insertDimension == dimension {
i := nodes.search(index)
var toDelete []int
for j := i; j < len(*nodes); j++ {
(*nodes)[j].value += number
if (*nodes)[j].value < index {
toDelete = append(toDelete, j)
if lastDimension {
*deleted = append(*deleted, (*nodes)[j].entry)
} else {
(*nodes)[j].orderedNodes.flatten(deleted)
}
continue
}
if lastDimension {
*modified = append(*modified, (*nodes)[j].entry)
} else {
(*nodes)[j].orderedNodes.flatten(modified)
}
}
for i, index := range toDelete {
nodes.deleteAt(index - i)
}
return
}
for _, node := range *nodes {
node.orderedNodes.insert(
insertDimension, dimension+1, maxDimension,
index, number, modified, deleted,
)
}
}
func (nodes orderedNodes) immutableInsert(insertDimension, dimension, maxDimension uint64,
index, number int64, modified, deleted *Entries) orderedNodes {
lastDimension := isLastDimension(maxDimension, dimension)
cp := make(orderedNodes, len(nodes))
copy(cp, nodes)
if insertDimension == dimension {
i := cp.search(index)
var toDelete []int
for j := i; j < len(cp); j++ {
nn := newNode(cp[j].value+number, cp[j].entry, !lastDimension)
nn.orderedNodes = cp[j].orderedNodes
cp[j] = nn
if cp[j].value < index {
toDelete = append(toDelete, j)
if lastDimension {
*deleted = append(*deleted, cp[j].entry)
} else {
cp[j].orderedNodes.flatten(deleted)
}
continue
}
if lastDimension {
*modified = append(*modified, cp[j].entry)
} else {
cp[j].orderedNodes.flatten(modified)
}
}
for _, index := range toDelete {
cp.deleteAt(index)
}
return cp
}
for i := 0; i < len(cp); i++ {
oldNode := nodes[i]
nn := newNode(oldNode.value, oldNode.entry, !lastDimension)
nn.orderedNodes = oldNode.orderedNodes.immutableInsert(
insertDimension, dimension+1,
maxDimension,
index, number,
modified, deleted,
)
cp[i] = nn
}
return cp
}

@ -0,0 +1,263 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rangetree
func isLastDimension(value, test uint64) bool {
return test >= value
}
type nodeBundle struct {
list *orderedNodes
index int
}
type orderedTree struct {
top orderedNodes
number uint64
dimensions uint64
path []*nodeBundle
}
func (ot *orderedTree) resetPath() {
ot.path = ot.path[:0]
}
func (ot *orderedTree) needNextDimension() bool {
return ot.dimensions > 1
}
// add will add the provided entry to the rangetree and return an
// entry if one was overwritten.
func (ot *orderedTree) add(entry Entry) *node {
var node *node
list := &ot.top
for i := uint64(1); i <= ot.dimensions; i++ {
if isLastDimension(ot.dimensions, i) {
overwritten := list.add(
newNode(entry.ValueAtDimension(i), entry, false),
)
if overwritten == nil {
ot.number++
}
return overwritten
}
node, _ = list.getOrAdd(entry, i, ot.dimensions)
list = &node.orderedNodes
}
return nil
}
// Add will add the provided entries to the tree. This method
// returns a list of entries that were overwritten in the order
// in which entries were received. If an entry doesn't overwrite
// anything, a nil will be returned for that entry in the returned
// slice.
func (ot *orderedTree) Add(entries ...Entry) Entries {
if len(entries) == 0 {
return nil
}
overwrittens := make(Entries, len(entries))
for i, entry := range entries {
if entry == nil {
continue
}
overwritten := ot.add(entry)
if overwritten != nil {
overwrittens[i] = overwritten.entry
}
}
return overwrittens
}
func (ot *orderedTree) delete(entry Entry) *node {
ot.resetPath()
var index int
var node *node
list := &ot.top
for i := uint64(1); i <= ot.dimensions; i++ {
value := entry.ValueAtDimension(i)
node, index = list.get(value)
if node == nil { // there's nothing to delete
return nil
}
nb := &nodeBundle{list: list, index: index}
ot.path = append(ot.path, nb)
list = &node.orderedNodes
}
ot.number--
for i := len(ot.path) - 1; i >= 0; i-- {
nb := ot.path[i]
nb.list.deleteAt(nb.index)
if len(*nb.list) > 0 {
break
}
}
return node
}
func (ot *orderedTree) get(entry Entry) Entry {
on := ot.top
for i := uint64(1); i <= ot.dimensions; i++ {
n, _ := on.get(entry.ValueAtDimension(i))
if n == nil {
return nil
}
if i == ot.dimensions {
return n.entry
}
on = n.orderedNodes
}
return nil
}
// Get returns any entries that exist at the addresses provided by the
// given entries. Entries are returned in the order in which they are
// received. If an entry cannot be found, a nil is returned in its
// place.
func (ot *orderedTree) Get(entries ...Entry) Entries {
result := make(Entries, 0, len(entries))
for _, entry := range entries {
result = append(result, ot.get(entry))
}
return result
}
// Delete will remove the provided entries from the tree.
// Any entries that were deleted will be returned in the order in
// which they were deleted. If an entry does not exist to be deleted,
// a nil is returned for that entry's index in the provided cells.
func (ot *orderedTree) Delete(entries ...Entry) Entries {
if len(entries) == 0 {
return nil
}
deletedEntries := make(Entries, len(entries))
for i, entry := range entries {
if entry == nil {
continue
}
deleted := ot.delete(entry)
if deleted != nil {
deletedEntries[i] = deleted.entry
}
}
return deletedEntries
}
// Len returns the number of items in the tree.
func (ot *orderedTree) Len() uint64 {
return ot.number
}
func (ot *orderedTree) apply(list orderedNodes, interval Interval,
dimension uint64, fn func(*node) bool) bool {
low, high := interval.LowAtDimension(dimension), interval.HighAtDimension(dimension)
if isLastDimension(ot.dimensions, dimension) {
if !list.apply(low, high, fn) {
return false
}
} else {
if !list.apply(low, high, func(n *node) bool {
if !ot.apply(n.orderedNodes, interval, dimension+1, fn) {
return false
}
return true
}) {
return false
}
return true
}
return true
}
// Apply will call (in order) the provided function to every
// entry that falls within the provided interval. Any alteration
// the the entry that would result in different answers to the
// interface methods results in undefined behavior.
func (ot *orderedTree) Apply(interval Interval, fn func(Entry) bool) {
ot.apply(ot.top, interval, 1, func(n *node) bool {
return fn(n.entry)
})
}
// Query will return an ordered list of results in the given
// interval.
func (ot *orderedTree) Query(interval Interval) Entries {
entries := NewEntries()
ot.apply(ot.top, interval, 1, func(n *node) bool {
entries = append(entries, n.entry)
return true
})
return entries
}
// InsertAtDimension will increment items at and above the given index
// by the number provided. Provide a negative number to to decrement.
// Returned are two lists. The first list is a list of entries that
// were moved. The second is a list entries that were deleted. These
// lists are exclusive.
func (ot *orderedTree) InsertAtDimension(dimension uint64,
index, number int64) (Entries, Entries) {
// TODO: perhaps return an error here?
if dimension > ot.dimensions || number == 0 {
return nil, nil
}
modified := make(Entries, 0, 100)
deleted := make(Entries, 0, 100)
ot.top.insert(dimension, 1, ot.dimensions,
index, number, &modified, &deleted,
)
ot.number -= uint64(len(deleted))
return modified, deleted
}
func newOrderedTree(dimensions uint64) *orderedTree {
return &orderedTree{
dimensions: dimensions,
path: make([]*nodeBundle, 0, dimensions),
}
}
// New is the constructor to create a new rangetree with
// the provided number of dimensions.
func New(dimensions uint64) RangeTree {
return newOrderedTree(dimensions)
}

@ -0,0 +1,91 @@
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package Int64 simply adds an Int64-typed version of the standard library's
sort/IntSlice implementation.
Also added is an Insert method.
*/
package slice
import "sort"
// Int64Slice is a slice that fulfills the sort.Interface interface.
type Int64Slice []int64
// Len returns the len of this slice. Required by sort.Interface.
func (s Int64Slice) Len() int {
return len(s)
}
// Less returns a bool indicating if the value at position i
// is less than at position j. Required by sort.Interface.
func (s Int64Slice) Less(i, j int) bool {
return s[i] < s[j]
}
// Search will search this slice and return an index that corresponds
// to the lowest position of that value. You'll need to check
// separately if the value at that position is equal to x. The
// behavior of this method is undefinited if the slice is not sorted.
func (s Int64Slice) Search(x int64) int {
return sort.Search(len(s), func(i int) bool {
return s[i] >= x
})
}
// Sort will in-place sort this list of int64s.
func (s Int64Slice) Sort() {
sort.Sort(s)
}
// Swap will swap the elements at positions i and j. This is required
// by sort.Interface.
func (s Int64Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Exists returns a bool indicating if the provided value exists
// in this list. This has undefined behavior if the list is not
// sorted.
func (s Int64Slice) Exists(x int64) bool {
i := s.Search(x)
if i == len(s) {
return false
}
return s[i] == x
}
// Insert will insert x into the sorted position in this list
// and return a list with the value added. If this slice has not
// been sorted Insert's behavior is undefined.
func (s Int64Slice) Insert(x int64) Int64Slice {
i := s.Search(x)
if i == len(s) {
return append(s, x)
}
if s[i] == x {
return s
}
s = append(s, 0)
copy(s[i+1:], s[i:])
s[i] = x
return s
}

@ -68,6 +68,10 @@ github.com/NYTimes/gziphandler
github.com/PuerkitoBio/purell
# github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578
github.com/PuerkitoBio/urlesc
# github.com/Workiva/go-datastructures v1.0.53
## explicit
github.com/Workiva/go-datastructures/rangetree
github.com/Workiva/go-datastructures/slice
# github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/alecthomas/template
github.com/alecthomas/template/parse

Loading…
Cancel
Save