tool for building bloom filters from log contents (#10594)

This adds a tool which builds scalable bloom filters from the contents
of logs themselves & records metrics.

---------

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
pull/10499/head
Owen Diehl 2 years ago committed by GitHub
parent 222ffc52d3
commit 02ae366451
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      go.mod
  2. 3
      go.sum
  3. 43
      tools/tsdb/bloom-tester/concurrent.go
  4. 331
      tools/tsdb/bloom-tester/lib.go
  5. 94
      tools/tsdb/bloom-tester/lib_test.go
  6. 6
      tools/tsdb/bloom-tester/main.go
  7. 112
      tools/tsdb/bloom-tester/metrics.go
  8. 34
      tools/tsdb/bloom-tester/sampler.go
  9. 116
      tools/tsdb/bloom-tester/tokenizer.go
  10. 102
      tools/tsdb/helpers/setup.go
  11. 6
      tools/tsdb/helpers/util.go
  12. 79
      tools/tsdb/index-analyzer/main.go
  13. 202
      vendor/github.com/owen-d/BoomFilters/LICENSE
  14. 90
      vendor/github.com/owen-d/BoomFilters/boom/boom.go
  15. 190
      vendor/github.com/owen-d/BoomFilters/boom/buckets.go
  16. 202
      vendor/github.com/owen-d/BoomFilters/boom/classic.go
  17. 158
      vendor/github.com/owen-d/BoomFilters/boom/counting.go
  18. 266
      vendor/github.com/owen-d/BoomFilters/boom/countmin.go
  19. 269
      vendor/github.com/owen-d/BoomFilters/boom/cuckoo.go
  20. 168
      vendor/github.com/owen-d/BoomFilters/boom/deletable.go
  21. 253
      vendor/github.com/owen-d/BoomFilters/boom/hyperloglog.go
  22. 269
      vendor/github.com/owen-d/BoomFilters/boom/inverse.go
  23. 104
      vendor/github.com/owen-d/BoomFilters/boom/minhash.go
  24. 300
      vendor/github.com/owen-d/BoomFilters/boom/partitioned.go
  25. 326
      vendor/github.com/owen-d/BoomFilters/boom/scalable.go
  26. 333
      vendor/github.com/owen-d/BoomFilters/boom/stable.go
  27. 128
      vendor/github.com/owen-d/BoomFilters/boom/topk.go
  28. 3
      vendor/modules.txt

@ -120,6 +120,7 @@ require (
github.com/fsnotify/fsnotify v1.6.0
github.com/grafana/loki/pkg/push v0.0.0-20230127102416-571f88bc5765
github.com/heroku/x v0.0.61
github.com/owen-d/BoomFilters v0.0.0-20230914145927-1ad00a0ec6fd
github.com/prometheus/alertmanager v0.26.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0

@ -677,6 +677,7 @@ github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.7.2 h1:RbQacI2n+1fIOslNq/pjgOfBe1RfjAa7hqHpojopCic=
github.com/cristalhq/hedgedhttp v0.7.2/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -1479,6 +1480,8 @@ github.com/oschwald/geoip2-golang v1.9.0/go.mod h1:BHK6TvDyATVQhKNbQBdrj9eAvuwOM
github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0=
github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg=
github.com/ovh/go-ovh v1.3.0 h1:mvZaddk4E4kLcXhzb+cxBsMPYp2pHqiQpWYkInsuZPQ=
github.com/owen-d/BoomFilters v0.0.0-20230914145927-1ad00a0ec6fd h1:roU44J23uoYVo/kmCjaB6Q2lZZkLTXTXhwjwyXt8i/w=
github.com/owen-d/BoomFilters v0.0.0-20230914145927-1ad00a0ec6fd/go.mod h1:l2v6SO5wXcOsaezQ432nnBMhvNHVwVe0yUtAKKn0f6Q=
github.com/packethost/packngo v0.1.1-0.20180711074735-b9cb5096f54c/go.mod h1:otzZQXgoO96RTzDB/Hycg0qZcXZsWJGJRSXbmEIJ+4M=
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=

@ -0,0 +1,43 @@
package main
import (
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
)
type pool struct {
n int // number of workers
ch chan struct{}
}
func newPool(n int) *pool {
p := &pool{
n: n,
ch: make(chan struct{}, n),
}
// seed channel
for i := 0; i < n; i++ {
p.ch <- struct{}{}
}
return p
}
func (p *pool) acquire(
ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta,
fn func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta),
) {
<-p.ch
go func() {
fn(ls, fp, chks)
p.ch <- struct{}{}
}()
}
func (p *pool) drain() {
for i := 0; i < p.n; i++ {
<-p.ch
}
}

@ -0,0 +1,331 @@
package main
import (
"context"
"flag"
"fmt"
"math"
"runtime"
"time"
"github.com/go-kit/log/level"
"github.com/owen-d/BoomFilters/boom"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/dskit/services"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
indexshipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/tsdb"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/tools/tsdb/helpers"
)
func execute() {
conf, svc, bucket, err := helpers.Setup()
helpers.ExitErr("setting up", err)
_, overrides, clientMetrics := helpers.DefaultConfigs()
flag.Parse()
objectClient, err := storage.NewObjectClient(conf.StorageConfig.TSDBShipperConfig.SharedStoreType, conf.StorageConfig, clientMetrics)
helpers.ExitErr("creating object client", err)
chunkClient := client.NewClient(objectClient, nil, conf.SchemaConfig)
tableRanges := helpers.GetIndexStoreTableRanges(config.TSDBType, conf.SchemaConfig.Configs)
openFn := func(p string) (indexshipper_index.Index, error) {
return tsdb.OpenShippableTSDB(p, tsdb.IndexOpts{})
}
shipper, err := indexshipper.NewIndexShipper(
conf.StorageConfig.TSDBShipperConfig.Config,
objectClient,
overrides,
nil,
openFn,
tableRanges[len(tableRanges)-1],
prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", prometheus.DefaultRegisterer),
util_log.Logger,
)
helpers.ExitErr("creating index shipper", err)
tenants, tableName, err := helpers.ResolveTenants(objectClient, bucket, tableRanges)
helpers.ExitErr("resolving tenants", err)
sampler, err := NewProbabilisticSampler(0.00008)
helpers.ExitErr("creating sampler", err)
metrics := NewMetrics(prometheus.DefaultRegisterer)
level.Info(util_log.Logger).Log("msg", "starting server")
err = services.StartAndAwaitRunning(context.Background(), svc)
helpers.ExitErr("waiting for service to start", err)
level.Info(util_log.Logger).Log("msg", "server started")
err = analyze(metrics, sampler, shipper, chunkClient, tableName, tenants)
helpers.ExitErr("analyzing", err)
}
var (
three = newNGramTokenizer(3, 4, 0)
threeSkip1 = newNGramTokenizer(3, 4, 1)
threeSkip2 = newNGramTokenizer(3, 4, 2)
threeSkip3 = newNGramTokenizer(3, 4, 3)
onePctError = func() *boom.ScalableBloomFilter { return boom.NewScalableBloomFilter(1024, 0.01, 0.8) }
fivePctError = func() *boom.ScalableBloomFilter { return boom.NewScalableBloomFilter(1024, 0.05, 0.8) }
)
var experiments = []Experiment{
// n > error > skip > index
NewExperiment(
"token=3skip0_error=1%_indexchunks=true",
three,
true,
onePctError,
),
NewExperiment(
"token=3skip0_error=1%_indexchunks=false",
three,
false,
onePctError,
),
NewExperiment(
"token=3skip1_error=1%_indexchunks=true",
threeSkip1,
true,
onePctError,
),
NewExperiment(
"token=3skip1_error=1%_indexchunks=false",
threeSkip1,
false,
onePctError,
),
NewExperiment(
"token=3skip2_error=1%_indexchunks=true",
threeSkip2,
true,
onePctError,
),
NewExperiment(
"token=3skip2_error=1%_indexchunks=false",
threeSkip2,
false,
onePctError,
),
NewExperiment(
"token=3skip0_error=5%_indexchunks=true",
three,
true,
fivePctError,
),
NewExperiment(
"token=3skip0_error=5%_indexchunks=false",
three,
false,
fivePctError,
),
NewExperiment(
"token=3skip1_error=5%_indexchunks=true",
threeSkip1,
true,
fivePctError,
),
NewExperiment(
"token=3skip1_error=5%_indexchunks=false",
threeSkip1,
false,
fivePctError,
),
NewExperiment(
"token=3skip2_error=5%_indexchunks=true",
threeSkip2,
true,
fivePctError,
),
NewExperiment(
"token=3skip2_error=5%_indexchunks=false",
threeSkip2,
false,
fivePctError,
),
}
func analyze(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexShipper, client client.Client, tableName string, tenants []string) error {
metrics.tenants.Add(float64(len(tenants)))
var n int // count iterated series
reportEvery := 10 // report every n chunks
pool := newPool(runtime.NumCPU())
for _, tenant := range tenants {
err := shipper.ForEach(
context.Background(),
tableName,
tenant,
indexshipper_index.ForEachIndexCallback(func(isMultiTenantIndex bool, idx indexshipper_index.Index) error {
if isMultiTenantIndex {
return nil
}
casted := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex)
_ = casted.ForSeries(
context.Background(),
nil, model.Earliest, model.Latest,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
chksCpy := make([]index.ChunkMeta, len(chks))
copy(chksCpy, chks)
pool.acquire(
ls.Copy(),
fp,
chksCpy,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
metrics.series.Inc()
metrics.chunks.Add(float64(len(chks)))
if !sampler.Sample() {
return
}
transformed := make([]chunk.Chunk, 0, len(chks))
for _, chk := range chks {
transformed = append(transformed, chunk.Chunk{
ChunkRef: logproto.ChunkRef{
Fingerprint: uint64(fp),
UserID: tenant,
From: chk.From(),
Through: chk.Through(),
Checksum: chk.Checksum,
},
})
}
got, err := client.GetChunks(
context.Background(),
transformed,
)
helpers.ExitErr("getting chunks", err)
// record raw chunk sizes
var chunkTotalUncompressedSize int
for _, c := range got {
chunkTotalUncompressedSize += c.Data.(*chunkenc.Facade).LokiChunk().UncompressedSize()
}
metrics.chunkSize.Observe(float64(chunkTotalUncompressedSize))
n += len(got)
// iterate experiments
for experimentIdx, experiment := range experiments {
sbf := experiment.bloom()
// Iterate chunks
var (
lines, inserts, collisions float64
)
for idx := range got {
tokenizer := experiment.tokenizer
if experiment.encodeChunkID {
tokenizer = ChunkIDTokenizer(got[idx].ChunkRef, tokenizer)
}
lc := got[idx].Data.(*chunkenc.Facade).LokiChunk()
// Only report on the last experiment since they run serially
if experimentIdx == len(experiments)-1 && (n+idx+1)%reportEvery == 0 {
estimatedProgress := float64(fp) / float64(model.Fingerprint(math.MaxUint64)) * 100.
level.Info(util_log.Logger).Log(
"msg", "iterated",
"progress", fmt.Sprintf("%.2f%%", estimatedProgress),
"chunks", len(chks),
"series", ls.String(),
)
}
itr, err := lc.Iterator(
context.Background(),
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
log.NewNoopPipeline().ForStream(ls),
)
helpers.ExitErr("getting iterator", err)
for itr.Next() && itr.Error() == nil {
toks := tokenizer.Tokens(itr.Entry().Line)
lines++
for _, tok := range toks {
for _, str := range []string{tok.Key, tok.Value} {
if str != "" {
if dup := sbf.TestAndAdd([]byte(str)); dup {
collisions++
}
inserts++
}
}
}
}
helpers.ExitErr("iterating chunks", itr.Error())
}
metrics.bloomSize.WithLabelValues(experiment.name).Observe(float64(sbf.Capacity() / 8))
fillRatio := sbf.FillRatio()
metrics.hammingWeightRatio.WithLabelValues(experiment.name).Observe(fillRatio)
metrics.estimatedCount.WithLabelValues(experiment.name).Observe(
float64(estimatedCount(sbf.Capacity(), sbf.FillRatio())),
)
metrics.lines.WithLabelValues(experiment.name).Add(lines)
metrics.inserts.WithLabelValues(experiment.name).Add(inserts)
metrics.collisions.WithLabelValues(experiment.name).Add(collisions)
}
metrics.seriesKept.Inc()
metrics.chunksKept.Add(float64(len(chks)))
metrics.chunksPerSeries.Observe(float64(len(chks)))
},
)
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
)
return nil
}),
)
helpers.ExitErr(fmt.Sprintf("iterating tenant %s", tenant), err)
}
level.Info(util_log.Logger).Log("msg", "waiting for workers to finish")
pool.drain() // wait for workers to finishh
level.Info(util_log.Logger).Log("msg", "waiting for final scrape")
time.Sleep(30 * time.Second) // allow final scrape
return nil
}
// n ≈ −m ln(1 − p).
func estimatedCount(m uint, p float64) uint {
return uint(-float64(m) * math.Log(1-p))
}

@ -0,0 +1,94 @@
package main
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestNGrams(t *testing.T) {
tokenizer := newNGramTokenizer(2, 4, 0)
for _, tc := range []struct {
desc string
input string
exp []Token
}{
{
desc: "empty",
input: "",
exp: nil,
},
{
desc: "single char",
input: "a",
exp: nil,
},
{
desc: "two chars",
input: "ab",
exp: []Token{{Key: "ab", Value: ""}},
},
{
desc: "three chars",
input: "abc",
exp: []Token{{Key: "ab", Value: ""}, {Key: "bc", Value: ""}, {Key: "abc", Value: ""}},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: "ab", Value: ""}, {Key: "bc", Value: ""}, {Key: "abc", Value: ""}, {Key: "cd", Value: ""}, {Key: "bcd", Value: ""}},
},
{
desc: "foo",
input: "日本語",
exp: []Token{{Key: "日本", Value: ""}, {Key: "本語", Value: ""}, {Key: "日本語", Value: ""}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tokenizer.Tokens(tc.input))
})
}
}
func TestNGramsSkip(t *testing.T) {
twoSkipOne := newNGramTokenizer(2, 3, 1)
for _, tc := range []struct {
desc string
tokenizer *ngramTokenizer
input string
exp []Token
}{
{
desc: "four chars",
tokenizer: twoSkipOne,
input: "abcd",
exp: []Token{{Key: "ab", Value: ""}, {Key: "cd", Value: ""}},
},
{
desc: "special chars",
tokenizer: twoSkipOne,
input: "日本語",
exp: []Token{{Key: "日本", Value: ""}},
},
{
desc: "multi",
tokenizer: newNGramTokenizer(2, 4, 1),
input: "abcdefghij",
exp: []Token{
{Key: "ab", Value: ""},
{Key: "abc", Value: ""},
{Key: "cd", Value: ""},
{Key: "cde", Value: ""},
{Key: "ef", Value: ""},
{Key: "efg", Value: ""},
{Key: "gh", Value: ""},
{Key: "ghi", Value: ""},
{Key: "ij", Value: ""},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tc.tokenizer.Tokens(tc.input))
})
}
}

@ -0,0 +1,6 @@
package main
// go build ./tools/tsdb/bloom-tester && BUCKET=19580 DIR=/tmp/loki-bloom-tester ./bloom-tester --config.file=/tmp/loki-config.yaml
func main() {
execute()
}

@ -0,0 +1,112 @@
package main
import (
"github.com/owen-d/BoomFilters/boom"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Experiment struct {
name string
tokenizer Tokenizer
bloom func() *boom.ScalableBloomFilter
encodeChunkID bool
}
func NewExperiment(name string, tokenizer Tokenizer, encodeChunkID bool, bloom func() *boom.ScalableBloomFilter) Experiment {
return Experiment{
name: name,
tokenizer: tokenizer,
bloom: bloom,
encodeChunkID: encodeChunkID,
}
}
const ExperimentLabel = "experiment"
type Metrics struct {
tenants prometheus.Counter
series prometheus.Counter // number of series
seriesKept prometheus.Counter // number of series kept
chunks prometheus.Counter // number of chunks
chunksKept prometheus.Counter // number of chunks kept
chunksPerSeries prometheus.Histogram // number of chunks per series
chunkSize prometheus.Histogram // uncompressed size of all chunks summed per series
lines *prometheus.CounterVec // number of lines processed per experiment (should be the same)
inserts *prometheus.CounterVec // number of inserts attempted into bloom filters
collisions *prometheus.CounterVec // number of inserts that collided with existing keys
hammingWeightRatio *prometheus.HistogramVec // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter
estimatedCount *prometheus.HistogramVec // estimated number of elements in the bloom filter
estimatedErrorRate *prometheus.HistogramVec // estimated error rate of the bloom filter
bloomSize *prometheus.HistogramVec // size of the bloom filter in bytes
}
func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
tenants: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "bloom_tenants",
Help: "Number of tenants",
}),
series: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "bloom_series",
Help: "Number of series",
}),
seriesKept: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "bloom_series_kept",
Help: "Number of series kept",
}),
chunks: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "bloom_chunks",
Help: "Number of chunks",
}),
chunksKept: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "bloom_chunks_kept",
Help: "Number of chunks kept",
}),
chunksPerSeries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "bloom_chunks_per_series",
Help: "Number of chunks per series",
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 12),
}),
chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "bloom_chunk_series_size",
Help: "Uncompressed size of chunks in a series",
Buckets: prometheus.ExponentialBucketsRange(1<<10, 1<<30, 10),
}),
lines: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "bloom_lines",
Help: "Number of lines processed",
}, []string{ExperimentLabel}),
inserts: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "bloom_inserts",
Help: "Number of inserts attempted into bloom filters",
}, []string{ExperimentLabel}),
collisions: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "bloom_collisions",
Help: "Number of inserts that collided with existing keys",
}, []string{ExperimentLabel}),
hammingWeightRatio: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "bloom_hamming_weight_ratio",
Help: "Ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter",
Buckets: prometheus.ExponentialBucketsRange(0.001, 1, 12),
}, []string{ExperimentLabel}),
estimatedCount: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "bloom_estimated_count",
Help: "Estimated number of elements in the bloom filter",
Buckets: prometheus.ExponentialBucketsRange(1, 32<<20, 10),
}, []string{ExperimentLabel}),
estimatedErrorRate: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "bloom_estimated_error_rate",
Help: "Estimated error rate of the bloom filter",
Buckets: prometheus.ExponentialBucketsRange(0.0001, 0.5, 10),
}, []string{ExperimentLabel}),
bloomSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "bloom_size",
Help: "Size of the bloom filter in bytes",
Buckets: prometheus.ExponentialBucketsRange(128, 16<<20, 8),
}, []string{ExperimentLabel}),
}
}

@ -0,0 +1,34 @@
package main
import (
"errors"
"math/rand"
)
type Sampler interface {
Sample() bool
}
func NewProbabilisticSampler(p float64) (*ProbabilisticSampler, error) {
if p < 0 || p > 1 {
return &ProbabilisticSampler{}, errors.New("invalid probability, must be between 0 and 1")
}
return &ProbabilisticSampler{
p: p,
rng: rand.New(rand.NewSource(0)), // always use deterministic seed so identical instantiations sample the same way
}, nil
}
// Sampler is a probabilistic sampler.
type ProbabilisticSampler struct {
p float64
rng *rand.Rand
}
func (s *ProbabilisticSampler) Sample() bool {
scale := 1e6
x := s.rng.Intn(int(scale))
return float64(x) < s.p*scale
}

@ -0,0 +1,116 @@
package main
import (
"fmt"
"unicode/utf8"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
)
type Token struct {
// Either key or value may be empty
Key, Value string
}
type Tokenizer interface {
Tokens(line string) []Token
}
type logfmtTokenizer struct {
parser *log.LogfmtParser
lbls *log.LabelsBuilder
}
func (t *logfmtTokenizer) Tokens(line string) []Token {
t.lbls.Reset()
t.parser.Process(0, []byte(line), t.lbls)
ls := t.lbls.LabelsResult().Labels()
res := make([]Token, 0, len(ls))
for _, l := range ls {
res = append(res, Token{Key: l.Name, Value: l.Value})
}
return res
}
func newLogfmtTokenizer() *logfmtTokenizer {
return &logfmtTokenizer{
// non strict, allow empty values
parser: log.NewLogfmtParser(false, true),
lbls: log.NewBaseLabelsBuilder().ForLabels(nil, 0),
}
}
type ngramTokenizer struct {
// [min,max) exclusivity
min, max, skip int
buffers [][]rune // circular buffers used for ngram generation
}
func newNGramTokenizer(min, max, skip int) *ngramTokenizer {
t := &ngramTokenizer{
min: min,
max: max,
skip: skip,
}
for i := t.min; i < t.max; i++ {
t.buffers = append(t.buffers, make([]rune, i))
}
return t
}
func (t *ngramTokenizer) Tokens(line string) (res []Token) {
var i int // rune index (not position that is measured in the range loop)
for _, r := range line {
// j is the index of the buffer to use
for j := 0; j < (t.max - t.min); j++ {
// n is the length of the ngram
n := j + t.min
// pos is the position in the buffer to overwrite
pos := i % n
t.buffers[j][pos] = r
if i >= n-1 && (i+1-n)%(t.skip+1) == 0 {
ngram := reassemble(t.buffers[j], (i+1)%n)
res = append(res, Token{Key: string(ngram), Value: ""})
}
}
i++
}
return
}
func reassemble(buf []rune, pos int) []byte {
res := make([]byte, 0, len(buf)*4) // 4 bytes per rune (i32)
for i := 0; i < len(buf); i++ {
cur := (pos + i) % len(buf)
res = utf8.AppendRune(res, buf[cur])
}
return res
}
type WrappedTokenizer struct {
t Tokenizer
f func(Token) Token
}
func (w *WrappedTokenizer) Tokens(line string) []Token {
toks := w.t.Tokens(line)
res := make([]Token, 0, len(toks))
for _, tok := range toks {
res = append(res, w.f(tok))
}
return append(res, toks...)
}
func ChunkIDTokenizer(chk logproto.ChunkRef, t Tokenizer) *WrappedTokenizer {
return &WrappedTokenizer{
t: t,
f: func(tok Token) Token {
tok.Key = fmt.Sprintf("%d:%d:%d:%s", chk.From, chk.Through, chk.Checksum, tok.Key)
return tok
},
}
}

@ -0,0 +1,102 @@
package helpers
import (
"flag"
"fmt"
"os"
"path/filepath"
"github.com/grafana/dskit/server"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/common/version"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/util/cfg"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
func Setup() (loki.Config, services.Service, string, error) {
var c loki.ConfigWrapper
if err := cfg.DynamicUnmarshal(&c, os.Args[1:], flag.CommandLine); err != nil {
fmt.Fprintf(os.Stderr, "failed parsing config: %v\n", err)
os.Exit(1)
}
bucket := os.Getenv("BUCKET")
dir := os.Getenv("DIR")
if bucket == "" {
return c.Config, nil, "", fmt.Errorf("$BUCKET must be specified")
}
if dir == "" {
return c.Config, nil, "", fmt.Errorf("$DIR must be specified")
}
if err := util.EnsureDirectory(dir); err != nil {
return c.Config, nil, "", fmt.Errorf("failed to ensure directory %s: %w", dir, err)
}
c.Config.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
util_log.InitLogger(&c.Server, prometheus.DefaultRegisterer, c.UseBufferedLogger, c.UseSyncLogger)
c.Config.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory = filepath.Join(dir, "tsdb-active")
c.Config.StorageConfig.TSDBShipperConfig.CacheLocation = filepath.Join(dir, "tsdb-cache")
svc, err := moduleManager(&c.Config.Server)
if err != nil {
return c.Config, nil, "", err
}
return c.Config, svc, bucket, nil
}
func moduleManager(cfg *server.Config) (services.Service, error) {
prometheus.MustRegister(version.NewCollector("loki"))
// unregister default go collector
prometheus.Unregister(collectors.NewGoCollector())
// register collector with additional metrics
prometheus.MustRegister(collectors.NewGoCollector(
collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll),
))
if cfg.HTTPListenPort == 0 {
cfg.HTTPListenPort = 8080
}
serv, err := server.New(*cfg)
if err != nil {
return nil, err
}
s := loki.NewServerService(serv, func() []services.Service { return nil })
return s, nil
}
func DefaultConfigs() (config.ChunkStoreConfig, *validation.Overrides, storage.ClientMetrics) {
var (
chunkStoreConfig config.ChunkStoreConfig
limits validation.Limits
clientMetrics storage.ClientMetrics
)
chunkStoreConfig.RegisterFlags(flag.NewFlagSet("chunk-store", flag.PanicOnError))
limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError))
overrides, _ := validation.NewOverrides(limits, nil)
return chunkStoreConfig, overrides, clientMetrics
}
func ExitErr(during string, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "encountered error during %s: %v\n", during, err)
os.Exit(1)
}
}

@ -1,4 +1,4 @@
package main
package helpers
import (
"context"
@ -49,7 +49,7 @@ func getTableNumberForTime(t model.Time) int64 {
}
// copied from storage/store.go
func getIndexStoreTableRanges(indexType string, periodicConfigs []config.PeriodConfig) config.TableRanges {
func GetIndexStoreTableRanges(indexType string, periodicConfigs []config.PeriodConfig) config.TableRanges {
var ranges config.TableRanges
for i := range periodicConfigs {
if periodicConfigs[i].IndexType != indexType {
@ -67,7 +67,7 @@ func getIndexStoreTableRanges(indexType string, periodicConfigs []config.PeriodC
return ranges
}
func resolveTenants(objectClient client.ObjectClient, bucket string, tableRanges config.TableRanges) ([]string, string, error) {
func ResolveTenants(objectClient client.ObjectClient, bucket string, tableRanges config.TableRanges) ([]string, string, error) {
if bucket == "" {
return nil, "", errors.New("empty bucket")
}

@ -2,37 +2,31 @@ package main
import (
"flag"
"fmt"
"os"
"path/filepath"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
indexshipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/tsdb"
"github.com/grafana/loki/pkg/util/cfg"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
"github.com/grafana/loki/tools/tsdb/helpers"
)
// go build ./tools/tsdb/index-analyzer && BUCKET=19453 DIR=/tmp/loki-index-analysis ./index-analyzer --config.file=/tmp/loki-config.yaml
func main() {
conf, bucket, err := setup()
exitErr("setting up", err)
conf, _, bucket, err := helpers.Setup()
helpers.ExitErr("setting up", err)
_, overrides, clientMetrics := defaultConfigs()
_, overrides, clientMetrics := helpers.DefaultConfigs()
flag.Parse()
objectClient, err := storage.NewObjectClient(conf.StorageConfig.TSDBShipperConfig.SharedStoreType, conf.StorageConfig, clientMetrics)
exitErr("creating object client", err)
helpers.ExitErr("creating object client", err)
tableRanges := getIndexStoreTableRanges(config.TSDBType, conf.SchemaConfig.Configs)
tableRanges := helpers.GetIndexStoreTableRanges(config.TSDBType, conf.SchemaConfig.Configs)
openFn := func(p string) (indexshipper_index.Index, error) {
return tsdb.OpenShippableTSDB(p, tsdb.IndexOpts{})
@ -48,64 +42,11 @@ func main() {
prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", prometheus.DefaultRegisterer),
util_log.Logger,
)
exitErr("creating index shipper", err)
helpers.ExitErr("creating index shipper", err)
tenants, tableName, err := resolveTenants(objectClient, bucket, tableRanges)
exitErr("resolving tenants", err)
tenants, tableName, err := helpers.ResolveTenants(objectClient, bucket, tableRanges)
helpers.ExitErr("resolving tenants", err)
err = analyze(shipper, tableName, tenants)
exitErr("analyzing", err)
}
func exitErr(during string, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "encountered error during %s: %v\n", during, err)
os.Exit(1)
}
}
func setup() (loki.Config, string, error) {
var c loki.ConfigWrapper
if err := cfg.DynamicUnmarshal(&c, os.Args[1:], flag.CommandLine); err != nil {
fmt.Fprintf(os.Stderr, "bonk:%T, %+v", err, err)
fmt.Fprintf(os.Stderr, "failed parsing config: %v\n", err)
os.Exit(1)
}
bucket := os.Getenv("BUCKET")
dir := os.Getenv("DIR")
if bucket == "" {
return c.Config, "", fmt.Errorf("$BUCKET must be specified")
}
if dir == "" {
return c.Config, "", fmt.Errorf("$DIR must be specified")
}
if err := util.EnsureDirectory(dir); err != nil {
return c.Config, "", fmt.Errorf("failed to ensure directory %s: %w", dir, err)
}
c.Config.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
serverCfg := &c.Server
serverCfg.Log = util_log.InitLogger(serverCfg, prometheus.DefaultRegisterer, c.UseBufferedLogger, c.UseSyncLogger)
c.Config.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory = filepath.Join(dir, "tsdb-active")
c.Config.StorageConfig.TSDBShipperConfig.CacheLocation = filepath.Join(dir, "tsdb-cache")
return c.Config, bucket, nil
}
func defaultConfigs() (config.ChunkStoreConfig, *validation.Overrides, storage.ClientMetrics) {
var (
chunkStoreConfig config.ChunkStoreConfig
limits validation.Limits
clientMetrics storage.ClientMetrics
)
chunkStoreConfig.RegisterFlags(flag.NewFlagSet("chunk-store", flag.PanicOnError))
limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError))
overrides, _ := validation.NewOverrides(limits, nil)
return chunkStoreConfig, overrides, clientMetrics
helpers.ExitErr("analyzing", err)
}

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@ -0,0 +1,90 @@
/*
Package boom implements probabilistic data structures for processing
continuous, unbounded data streams. This includes Stable Bloom Filters,
Scalable Bloom Filters, Counting Bloom Filters, Inverse Bloom Filters, several
variants of traditional Bloom filters, HyperLogLog, Count-Min Sketch, and
MinHash.
Classic Bloom filters generally require a priori knowledge of the data set
in order to allocate an appropriately sized bit array. This works well for
offline processing, but online processing typically involves unbounded data
streams. With enough data, a traditional Bloom filter "fills up", after
which it has a false-positive probability of 1.
Boom Filters are useful for situations where the size of the data set isn't
known ahead of time. For example, a Stable Bloom Filter can be used to
deduplicate events from an unbounded event stream with a specified upper
bound on false positives and minimal false negatives. Alternatively, an
Inverse Bloom Filter is ideal for deduplicating a stream where duplicate
events are relatively close together. This results in no false positives
and, depending on how close together duplicates are, a small probability of
false negatives. Scalable Bloom Filters place a tight upper bound on false
positives while avoiding false negatives but require allocating memory
proportional to the size of the data set. Counting Bloom Filters and Cuckoo
Filters are useful for cases which require adding and removing elements to and
from a set.
For large or unbounded data sets, calculating the exact cardinality is
impractical. HyperLogLog uses a fraction of the memory while providing an
accurate approximation. Similarly, Count-Min Sketch provides an efficient way
to estimate event frequency for data streams. TopK tracks the top-k most
frequent elements.
MinHash is a probabilistic algorithm to approximate the similarity between two
sets. This can be used to cluster or compare documents by splitting the corpus
into a bag of words.
*/
package boom
import (
"hash"
"math"
)
// optimal fill ratio
const fillRatio = 0.5
// Filter is a probabilistic data structure which is used to test the
// membership of an element in a set.
type Filter interface {
// Test will test for membership of the data and returns true if it is a
// member, false if not.
Test([]byte) bool
// Add will add the data to the Bloom filter. It returns the filter to
// allow for chaining.
Add([]byte) Filter
// TestAndAdd is equivalent to calling Test followed by Add. It returns
// true if the data is a member, false if not.
TestAndAdd([]byte) bool
}
// OptimalM calculates the optimal Bloom filter size, m, based on the number of
// items and the desired rate of false positives.
func OptimalM(n uint, fpRate float64) uint {
return uint(math.Ceil(float64(n) / ((math.Log(fillRatio) *
math.Log(1-fillRatio)) / math.Abs(math.Log(fpRate)))))
}
// OptimalK calculates the optimal number of hash functions to use for a Bloom
// filter based on the desired rate of false positives.
func OptimalK(fpRate float64) uint {
return uint(math.Ceil(math.Log2(1 / fpRate)))
}
// n ≈ −m ln(1 − p).
func estimatedCount(m uint, p float64) uint {
return uint(-float64(m) * math.Log(1-p))
}
// hashKernel returns the upper and lower base hash values from which the k
// hashes are derived.
func hashKernel(data []byte, hash hash.Hash64) (uint32, uint32) {
hash.Write(data)
sum := hash.Sum64()
hash.Reset()
upper := uint32(sum & 0xffffffff)
lower := uint32((sum >> 32) & 0xffffffff)
return upper, lower
}

@ -0,0 +1,190 @@
package boom
import (
"bytes"
"encoding/binary"
"io"
"math/bits"
)
// Buckets is a fast, space-efficient array of buckets where each bucket can
// store up to a configured maximum value.
type Buckets struct {
data []byte
bucketSize uint8
max uint8
count uint
}
// NewBuckets creates a new Buckets with the provided number of buckets where
// each bucket is the specified number of bits.
func NewBuckets(count uint, bucketSize uint8) *Buckets {
return &Buckets{
count: count,
data: make([]byte, (count*uint(bucketSize)+7)/8),
bucketSize: bucketSize,
max: (1 << bucketSize) - 1,
}
}
// MaxBucketValue returns the maximum value that can be stored in a bucket.
func (b *Buckets) MaxBucketValue() uint8 {
return b.max
}
// Count returns the number of buckets.
func (b *Buckets) Count() uint {
return b.count
}
// Increment will increment the value in the specified bucket by the provided
// delta. A bucket can be decremented by providing a negative delta. The value
// is clamped to zero and the maximum bucket value. Returns itself to allow for
// chaining.
func (b *Buckets) Increment(bucket uint, delta int32) *Buckets {
val := int32(b.getBits(bucket*uint(b.bucketSize), uint(b.bucketSize))) + delta
if val > int32(b.max) {
val = int32(b.max)
} else if val < 0 {
val = 0
}
b.setBits(uint32(bucket)*uint32(b.bucketSize), uint32(b.bucketSize), uint32(val))
return b
}
// Set will set the bucket value. The value is clamped to zero and the maximum
// bucket value. Returns itself to allow for chaining.
func (b *Buckets) Set(bucket uint, value uint8) *Buckets {
if value > b.max {
value = b.max
}
b.setBits(uint32(bucket)*uint32(b.bucketSize), uint32(b.bucketSize), uint32(value))
return b
}
// Get returns the value in the specified bucket.
func (b *Buckets) Get(bucket uint) uint32 {
return b.getBits(bucket*uint(b.bucketSize), uint(b.bucketSize))
}
func (b *Buckets) PopCount() (count int) {
for _, x := range b.data {
count += bits.OnesCount8(uint8(x))
}
return count
}
// Reset restores the Buckets to the original state. Returns itself to allow
// for chaining.
func (b *Buckets) Reset() *Buckets {
b.data = make([]byte, (b.count*uint(b.bucketSize)+7)/8)
return b
}
// getBits returns the bits at the specified offset and length.
func (b *Buckets) getBits(offset, length uint) uint32 {
byteIndex := offset / 8
byteOffset := offset % 8
if byteOffset+length > 8 {
rem := 8 - byteOffset
return b.getBits(offset, rem) | (b.getBits(offset+rem, length-rem) << rem)
}
bitMask := uint32((1 << length) - 1)
return (uint32(b.data[byteIndex]) & (bitMask << byteOffset)) >> byteOffset
}
// setBits sets bits at the specified offset and length.
func (b *Buckets) setBits(offset, length, bits uint32) {
byteIndex := offset / 8
byteOffset := offset % 8
if byteOffset+length > 8 {
rem := 8 - byteOffset
b.setBits(offset, rem, bits)
b.setBits(offset+rem, length-rem, bits>>rem)
return
}
bitMask := uint32((1 << length) - 1)
b.data[byteIndex] = byte(uint32(b.data[byteIndex]) & ^(bitMask << byteOffset))
b.data[byteIndex] = byte(uint32(b.data[byteIndex]) | ((bits & bitMask) << byteOffset))
}
// WriteTo writes a binary representation of Buckets to an i/o stream.
// It returns the number of bytes written.
func (b *Buckets) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, b.bucketSize)
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, b.max)
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(b.count))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(len(b.data)))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, b.data)
if err != nil {
return 0, err
}
return int64(len(b.data) + 2*binary.Size(uint8(0)) + 2*binary.Size(uint64(0))), err
}
// ReadFrom reads a binary representation of Buckets (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (b *Buckets) ReadFrom(stream io.Reader) (int64, error) {
var bucketSize, max uint8
var count, len uint64
err := binary.Read(stream, binary.BigEndian, &bucketSize)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &max)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &count)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &len)
if err != nil {
return 0, err
}
data := make([]byte, len)
err = binary.Read(stream, binary.BigEndian, &data)
if err != nil {
return 0, err
}
b.bucketSize = bucketSize
b.max = max
b.count = uint(count)
b.data = data
return int64(int(len) + 2*binary.Size(uint8(0)) + 2*binary.Size(uint64(0))), nil
}
// GobEncode implements gob.GobEncoder interface.
func (b *Buckets) GobEncode() ([]byte, error) {
var buf bytes.Buffer
_, err := b.WriteTo(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode implements gob.GobDecoder interface.
func (b *Buckets) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
_, err := b.ReadFrom(buf)
return err
}

@ -0,0 +1,202 @@
package boom
import (
"bytes"
"encoding/binary"
"hash"
"hash/fnv"
"io"
"math"
)
// BloomFilter implements a classic Bloom filter. A Bloom filter has a non-zero
// probability of false positives and a zero probability of false negatives.
type BloomFilter struct {
buckets *Buckets // filter data
hash hash.Hash64 // hash function (kernel for all k functions)
m uint // filter size
k uint // number of hash functions
count uint // number of items added
}
// NewBloomFilter creates a new Bloom filter optimized to store n items with a
// specified target false-positive rate.
func NewBloomFilter(n uint, fpRate float64) *BloomFilter {
m := OptimalM(n, fpRate)
return &BloomFilter{
buckets: NewBuckets(m, 1),
hash: fnv.New64(),
m: m,
k: OptimalK(fpRate),
}
}
// Capacity returns the Bloom filter capacity, m.
func (b *BloomFilter) Capacity() uint {
return b.m
}
// K returns the number of hash functions.
func (b *BloomFilter) K() uint {
return b.k
}
// Count returns the number of items added to the filter.
func (b *BloomFilter) Count() uint {
return b.count
}
// EstimatedFillRatio returns the current estimated ratio of set bits.
func (b *BloomFilter) EstimatedFillRatio() float64 {
return 1 - math.Exp((-float64(b.count)*float64(b.k))/float64(b.m))
}
// FillRatio returns the ratio of set bits.
func (b *BloomFilter) FillRatio() float64 {
sum := uint32(0)
for i := uint(0); i < b.buckets.Count(); i++ {
sum += b.buckets.Get(i)
}
return float64(sum) / float64(b.m)
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives but a zero probability of false
// negatives.
func (b *BloomFilter) Test(data []byte) bool {
lower, upper := hashKernel(data, b.hash)
// If any of the K bits are not set, then it's not a member.
for i := uint(0); i < b.k; i++ {
if b.buckets.Get((uint(lower)+uint(upper)*i)%b.m) == 0 {
return false
}
}
return true
}
// Add will add the data to the Bloom filter. It returns the filter to allow
// for chaining.
func (b *BloomFilter) Add(data []byte) Filter {
lower, upper := hashKernel(data, b.hash)
// Set the K bits.
for i := uint(0); i < b.k; i++ {
b.buckets.Set((uint(lower)+uint(upper)*i)%b.m, 1)
}
b.count++
return b
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not.
func (b *BloomFilter) TestAndAdd(data []byte) bool {
lower, upper := hashKernel(data, b.hash)
member := true
// If any of the K bits are not set, then it's not a member.
for i := uint(0); i < b.k; i++ {
idx := (uint(lower) + uint(upper)*i) % b.m
if b.buckets.Get(idx) == 0 {
member = false
}
b.buckets.Set(idx, 1)
}
b.count++
return member
}
// Reset restores the Bloom filter to its original state. It returns the filter
// to allow for chaining.
func (b *BloomFilter) Reset() *BloomFilter {
b.buckets.Reset()
return b
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (b *BloomFilter) SetHash(h hash.Hash64) {
b.hash = h
}
// WriteTo writes a binary representation of the BloomFilter to an i/o stream.
// It returns the number of bytes written.
func (b *BloomFilter) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, uint64(b.count))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(b.m))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(b.k))
if err != nil {
return 0, err
}
writtenSize, err := b.buckets.WriteTo(stream)
if err != nil {
return 0, err
}
return writtenSize + int64(3*binary.Size(uint64(0))), err
}
// ReadFrom reads a binary representation of BloomFilter (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (b *BloomFilter) ReadFrom(stream io.Reader) (int64, error) {
var count, m, k uint64
var buckets Buckets
err := binary.Read(stream, binary.BigEndian, &count)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &m)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &k)
if err != nil {
return 0, err
}
readSize, err := buckets.ReadFrom(stream)
if err != nil {
return 0, err
}
b.count = uint(count)
b.m = uint(m)
b.k = uint(k)
b.buckets = &buckets
return readSize + int64(3*binary.Size(uint64(0))), nil
}
// GobEncode implements gob.GobEncoder interface.
func (b *BloomFilter) GobEncode() ([]byte, error) {
var buf bytes.Buffer
_, err := b.WriteTo(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode implements gob.GobDecoder interface.
func (b *BloomFilter) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
_, err := b.ReadFrom(buf)
if b.hash == nil {
b.hash = fnv.New64()
}
return err
}

@ -0,0 +1,158 @@
package boom
import (
"hash"
"hash/fnv"
)
// CountingBloomFilter implements a Counting Bloom Filter as described by Fan,
// Cao, Almeida, and Broder in Summary Cache: A Scalable Wide-Area Web Cache
// Sharing Protocol:
//
// http://pages.cs.wisc.edu/~jussara/papers/00ton.pdf
//
// A Counting Bloom Filter (CBF) provides a way to remove elements by using an
// array of n-bit buckets. When an element is added, the respective buckets are
// incremented. To remove an element, the respective buckets are decremented. A
// query checks that each of the respective buckets are non-zero. Because CBFs
// allow elements to be removed, they introduce a non-zero probability of false
// negatives in addition to the possibility of false positives.
//
// Counting Bloom Filters are useful for cases where elements are both added
// and removed from the data set. Since they use n-bit buckets, CBFs use
// roughly n-times more memory than traditional Bloom filters.
type CountingBloomFilter struct {
buckets *Buckets // filter data
hash hash.Hash64 // hash function (kernel for all k functions)
m uint // number of buckets
k uint // number of hash functions
count uint // number of items in the filter
indexBuffer []uint // buffer used to cache indices
}
// NewCountingBloomFilter creates a new Counting Bloom Filter optimized to
// store n items with a specified target false-positive rate and bucket size.
// If you don't know how many bits to use for buckets, use
// NewDefaultCountingBloomFilter for a sensible default.
func NewCountingBloomFilter(n uint, b uint8, fpRate float64) *CountingBloomFilter {
var (
m = OptimalM(n, fpRate)
k = OptimalK(fpRate)
)
return &CountingBloomFilter{
buckets: NewBuckets(m, b),
hash: fnv.New64(),
m: m,
k: k,
indexBuffer: make([]uint, k),
}
}
// NewDefaultCountingBloomFilter creates a new Counting Bloom Filter optimized
// to store n items with a specified target false-positive rate. Buckets are
// allocated four bits.
func NewDefaultCountingBloomFilter(n uint, fpRate float64) *CountingBloomFilter {
return NewCountingBloomFilter(n, 4, fpRate)
}
// Capacity returns the Bloom filter capacity, m.
func (c *CountingBloomFilter) Capacity() uint {
return c.m
}
// K returns the number of hash functions.
func (c *CountingBloomFilter) K() uint {
return c.k
}
// Count returns the number of items in the filter.
func (c *CountingBloomFilter) Count() uint {
return c.count
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives and false negatives.
func (c *CountingBloomFilter) Test(data []byte) bool {
lower, upper := hashKernel(data, c.hash)
// If any of the K bits are not set, then it's not a member.
for i := uint(0); i < c.k; i++ {
if c.buckets.Get((uint(lower)+uint(upper)*i)%c.m) == 0 {
return false
}
}
return true
}
// Add will add the data to the Bloom filter. It returns the filter to allow
// for chaining.
func (c *CountingBloomFilter) Add(data []byte) Filter {
lower, upper := hashKernel(data, c.hash)
// Set the K bits.
for i := uint(0); i < c.k; i++ {
c.buckets.Increment((uint(lower)+uint(upper)*i)%c.m, 1)
}
c.count++
return c
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not.
func (c *CountingBloomFilter) TestAndAdd(data []byte) bool {
lower, upper := hashKernel(data, c.hash)
member := true
// If any of the K bits are not set, then it's not a member.
for i := uint(0); i < c.k; i++ {
idx := (uint(lower) + uint(upper)*i) % c.m
if c.buckets.Get(idx) == 0 {
member = false
}
c.buckets.Increment(idx, 1)
}
c.count++
return member
}
// TestAndRemove will test for membership of the data and remove it from the
// filter if it exists. Returns true if the data was a member, false if not.
func (c *CountingBloomFilter) TestAndRemove(data []byte) bool {
lower, upper := hashKernel(data, c.hash)
member := true
// Set the K bits.
for i := uint(0); i < c.k; i++ {
c.indexBuffer[i] = (uint(lower) + uint(upper)*i) % c.m
if c.buckets.Get(c.indexBuffer[i]) == 0 {
member = false
}
}
if member {
for _, idx := range c.indexBuffer {
c.buckets.Increment(idx, -1)
}
c.count--
}
return member
}
// Reset restores the Bloom filter to its original state. It returns the filter
// to allow for chaining.
func (c *CountingBloomFilter) Reset() *CountingBloomFilter {
c.buckets.Reset()
c.count = 0
return c
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (c *CountingBloomFilter) SetHash(h hash.Hash64) {
c.hash = h
}

@ -0,0 +1,266 @@
package boom
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash"
"hash/fnv"
"io"
"math"
)
// CountMinSketch implements a Count-Min Sketch as described by Cormode and
// Muthukrishnan in An Improved Data Stream Summary: The Count-Min Sketch and
// its Applications:
//
// http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf
//
// A Count-Min Sketch (CMS) is a probabilistic data structure which
// approximates the frequency of events in a data stream. Unlike a hash map, a
// CMS uses sub-linear space at the expense of a configurable error factor.
// Similar to Counting Bloom filters, items are hashed to a series of buckets,
// which increment a counter. The frequency of an item is estimated by taking
// the minimum of each of the item's respective counter values.
//
// Count-Min Sketches are useful for counting the frequency of events in
// massive data sets or unbounded streams online. In these situations, storing
// the entire data set or allocating counters for every event in memory is
// impractical. It may be possible for offline processing, but real-time
// processing requires fast, space-efficient solutions like the CMS. For
// approximating set cardinality, refer to the HyperLogLog.
type CountMinSketch struct {
matrix [][]uint64 // count matrix
width uint // matrix width
depth uint // matrix depth
count uint64 // number of items added
epsilon float64 // relative-accuracy factor
delta float64 // relative-accuracy probability
hash hash.Hash64 // hash function (kernel for all depth functions)
}
// NewCountMinSketch creates a new Count-Min Sketch whose relative accuracy is
// within a factor of epsilon with probability delta. Both of these parameters
// affect the space and time complexity.
func NewCountMinSketch(epsilon, delta float64) *CountMinSketch {
var (
width = uint(math.Ceil(math.E / epsilon))
depth = uint(math.Ceil(math.Log(1 / delta)))
matrix = make([][]uint64, depth)
)
for i := uint(0); i < depth; i++ {
matrix[i] = make([]uint64, width)
}
return &CountMinSketch{
matrix: matrix,
width: width,
depth: depth,
epsilon: epsilon,
delta: delta,
hash: fnv.New64(),
}
}
// Epsilon returns the relative-accuracy factor, epsilon.
func (c *CountMinSketch) Epsilon() float64 {
return c.epsilon
}
// Delta returns the relative-accuracy probability, delta.
func (c *CountMinSketch) Delta() float64 {
return c.delta
}
// TotalCount returns the number of items added to the sketch.
func (c *CountMinSketch) TotalCount() uint64 {
return c.count
}
// Add will add the data to the set. Returns the CountMinSketch to allow for
// chaining.
func (c *CountMinSketch) Add(data []byte) *CountMinSketch {
lower, upper := hashKernel(data, c.hash)
// Increment count in each row.
for i := uint(0); i < c.depth; i++ {
c.matrix[i][(uint(lower)+uint(upper)*i)%c.width]++
}
c.count++
return c
}
// Count returns the approximate count for the specified item, correct within
// epsilon * total count with a probability of delta.
func (c *CountMinSketch) Count(data []byte) uint64 {
var (
lower, upper = hashKernel(data, c.hash)
count = uint64(math.MaxUint64)
)
for i := uint(0); i < c.depth; i++ {
count = uint64(math.Min(float64(count),
float64(c.matrix[i][(uint(lower)+uint(upper)*i)%c.width])))
}
return count
}
// Merge combines this CountMinSketch with another. Returns an error if the
// matrix width and depth are not equal.
func (c *CountMinSketch) Merge(other *CountMinSketch) error {
if c.depth != other.depth {
return errors.New("matrix depth must match")
}
if c.width != other.width {
return errors.New("matrix width must match")
}
for i := uint(0); i < c.depth; i++ {
for j := uint(0); j < c.width; j++ {
c.matrix[i][j] += other.matrix[i][j]
}
}
c.count += other.count
return nil
}
// Reset restores the CountMinSketch to its original state. It returns itself
// to allow for chaining.
func (c *CountMinSketch) Reset() *CountMinSketch {
for i := 0; i < len(c.matrix); i++ {
for j := 0; j < len(c.matrix[i]); j++ {
c.matrix[i][j] = 0
}
}
c.count = 0
return c
}
// SetHash sets the hashing function used.
func (c *CountMinSketch) SetHash(h hash.Hash64) {
c.hash = h
}
// WriteDataTo writes a binary representation of the CMS data to
// an io stream. It returns the number of bytes written and error
func (c *CountMinSketch) WriteDataTo(stream io.Writer) (int, error) {
buf := new(bytes.Buffer)
// serialize epsilon and delta as cms configuration check
err := binary.Write(buf, binary.LittleEndian, c.epsilon)
if err != nil {
return 0, err
}
err = binary.Write(buf, binary.LittleEndian, c.delta)
if err != nil {
return 0, err
}
err = binary.Write(buf, binary.LittleEndian, c.count)
if err != nil {
return 0, err
}
// encode matrix
for i := range c.matrix {
err = binary.Write(buf, binary.LittleEndian, c.matrix[i])
if err != nil {
return 0, err
}
}
return stream.Write(buf.Bytes())
}
// ReadDataFrom reads a binary representation of the CMS data written
// by WriteDataTo() from io stream. It returns the number of bytes read
// and error
// If serialized CMS configuration is different it returns error with expected params
func (c *CountMinSketch) ReadDataFrom(stream io.Reader) (int, error) {
var (
count uint64
epsilon, delta float64
)
err := binary.Read(stream, binary.LittleEndian, &epsilon)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.LittleEndian, &delta)
if err != nil {
return 0, err
}
// check if serialized and target cms configurations are same
if c.epsilon != epsilon || c.delta != delta {
return 0, fmt.Errorf("expected cms values for epsilon %f and delta %f", epsilon, delta)
}
err = binary.Read(stream, binary.LittleEndian, &count)
if err != nil {
return 0, err
}
for i := uint(0); i < c.depth; i++ {
err = binary.Read(stream, binary.LittleEndian, c.matrix[i])
}
// count size of matrix and count
size := int(c.depth*c.width)*binary.Size(uint64(0)) + binary.Size(count) + 2*binary.Size(float64(0))
c.count = count
return size, err
}
// TestAndRemove attemps to remove n counts of data from the CMS. If
// n is greater than the data count, TestAndRemove is a no-op and
// returns false. Else, return true and decrement count by n.
func (c *CountMinSketch) TestAndRemove(data []byte, n uint64) bool {
h, count := c.traverseDepth(data)
if n > count {
return false
}
for i := uint(0); i < c.depth; i++ {
*h[i] -= n
}
return true
}
// TestAndRemoveAll counts data frequency, performs TestAndRemove(data, count),
// and returns true if count is positive. If count is 0, TestAndRemoveAll is a
// no-op and returns false.
func (c *CountMinSketch) TestAndRemoveAll(data []byte) bool {
h, count := c.traverseDepth(data)
if count == 0 {
return false
}
for i := uint(0); i < c.depth; i++ {
*h[i] -= count
}
return true
}
func (c *CountMinSketch) traverseDepth(data []byte) ([]*uint64, uint64) {
var (
lower, upper = hashKernel(data, c.hash)
count = uint64(math.MaxUint64)
h = make([]*uint64, c.depth)
)
for i := uint(0); i < c.depth; i++ {
h[i] = &c.matrix[i][(uint(lower)+uint(upper)*i)%c.width]
count = uint64(math.Min(float64(count), float64(*h[i])))
}
return h, count
}

@ -0,0 +1,269 @@
package boom
import (
"bytes"
"encoding/binary"
"errors"
"hash"
"hash/fnv"
"math"
"math/rand"
)
// maxNumKicks is the maximum number of relocations to attempt when inserting
// an element before considering the filter full.
const maxNumKicks = 500
// bucket consists of a set of []byte entries.
type bucket [][]byte
// contains indicates if the given fingerprint is contained in one of the
// bucket's entries.
func (b bucket) contains(f []byte) bool {
return b.indexOf(f) != -1
}
// indexOf returns the entry index of the given fingerprint or -1 if it's not
// in the bucket.
func (b bucket) indexOf(f []byte) int {
for i, fingerprint := range b {
if bytes.Equal(f, fingerprint) {
return i
}
}
return -1
}
// getEmptyEntry returns the index of the next available entry in the bucket or
// an error if it's full.
func (b bucket) getEmptyEntry() (int, error) {
for i, fingerprint := range b {
if fingerprint == nil {
return i, nil
}
}
return -1, errors.New("full")
}
// CuckooFilter implements a Cuckoo Bloom filter as described by Andersen,
// Kaminsky, and Mitzenmacher in Cuckoo Filter: Practically Better Than Bloom:
//
// http://www.pdl.cmu.edu/PDL-FTP/FS/cuckoo-conext2014.pdf
//
// A Cuckoo Filter is a Bloom filter variation which provides support for
// removing elements without significantly degrading space and performance. It
// works by using a cuckoo hashing scheme for inserting items. Instead of
// storing the elements themselves, it stores their fingerprints which also
// allows for item removal without false negatives (if you don't attempt to
// remove an item not contained in the filter).
//
// For applications that store many items and target moderately low
// false-positive rates, cuckoo filters have lower space overhead than
// space-optimized Bloom filters.
type CuckooFilter struct {
buckets []bucket
hash hash.Hash32 // hash function (used for fingerprint and hash)
m uint // number of buckets
b uint // number of entries per bucket
f uint // length of fingerprints (in bytes)
count uint // number of items in the filter
n uint // filter capacity
}
// NewCuckooFilter creates a new Cuckoo Bloom filter optimized to store n items
// with a specified target false-positive rate.
func NewCuckooFilter(n uint, fpRate float64) *CuckooFilter {
var (
b = uint(4)
f = calculateF(b, fpRate)
m = power2(n / f * 8)
buckets = make([]bucket, m)
)
for i := uint(0); i < m; i++ {
buckets[i] = make(bucket, b)
}
return &CuckooFilter{
buckets: buckets,
hash: fnv.New32(),
m: m,
b: b,
f: f,
n: n,
}
}
// Buckets returns the number of buckets.
func (c *CuckooFilter) Buckets() uint {
return c.m
}
// Capacity returns the number of items the filter can store.
func (c *CuckooFilter) Capacity() uint {
return c.n
}
// Count returns the number of items in the filter.
func (c *CuckooFilter) Count() uint {
return c.count
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives.
func (c *CuckooFilter) Test(data []byte) bool {
i1, i2, f := c.components(data)
// If either bucket contains f, it's a member.
return c.buckets[i1%c.m].contains(f) || c.buckets[i2%c.m].contains(f)
}
// Add will add the data to the Cuckoo Filter. It returns an error if the
// filter is full. If the filter is full, an item is removed to make room for
// the new item. This introduces a possibility for false negatives. To avoid
// this, use Count and Capacity to check if the filter is full before adding an
// item.
func (c *CuckooFilter) Add(data []byte) error {
return c.add(c.components(data))
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not. An error is returned if the filter is
// full. If the filter is full, an item is removed to make room for the new
// item. This introduces a possibility for false negatives. To avoid this, use
// Count and Capacity to check if the filter is full before adding an item.
func (c *CuckooFilter) TestAndAdd(data []byte) (bool, error) {
i1, i2, f := c.components(data)
// If either bucket contains f, it's a member.
if c.buckets[i1%c.m].contains(f) || c.buckets[i2%c.m].contains(f) {
return true, nil
}
return false, c.add(i1, i2, f)
}
// TestAndRemove will test for membership of the data and remove it from the
// filter if it exists. Returns true if the data was a member, false if not.
func (c *CuckooFilter) TestAndRemove(data []byte) bool {
i1, i2, f := c.components(data)
// Try to remove from bucket[i1].
b1 := c.buckets[i1%c.m]
if idx := b1.indexOf(f); idx != -1 {
b1[idx] = nil
c.count--
return true
}
// Try to remove from bucket[i2].
b2 := c.buckets[i2%c.m]
if idx := b2.indexOf(f); idx != -1 {
b2[idx] = nil
c.count--
return true
}
return false
}
// Reset restores the Bloom filter to its original state. It returns the filter
// to allow for chaining.
func (c *CuckooFilter) Reset() *CuckooFilter {
buckets := make([]bucket, c.m)
for i := uint(0); i < c.m; i++ {
buckets[i] = make(bucket, c.b)
}
c.buckets = buckets
c.count = 0
return c
}
// add will insert the fingerprint into the filter returning an error if the
// filter is full.
func (c *CuckooFilter) add(i1, i2 uint, f []byte) error {
// Try to insert into bucket[i1].
b1 := c.buckets[i1%c.m]
if idx, err := b1.getEmptyEntry(); err == nil {
b1[idx] = f
c.count++
return nil
}
// Try to insert into bucket[i2].
b2 := c.buckets[i2%c.m]
if idx, err := b2.getEmptyEntry(); err == nil {
b2[idx] = f
c.count++
return nil
}
// Must relocate existing items.
i := i1
for n := 0; n < maxNumKicks; n++ {
bucketIdx := i % c.m
entryIdx := rand.Intn(int(c.b))
f, c.buckets[bucketIdx][entryIdx] = c.buckets[bucketIdx][entryIdx], f
i = i ^ uint(binary.BigEndian.Uint32(c.computeHash(f)))
b := c.buckets[i%c.m]
if idx, err := b.getEmptyEntry(); err == nil {
b[idx] = f
c.count++
return nil
}
}
return errors.New("full")
}
// components returns the two hash values used to index into the buckets and
// the fingerprint for the given element.
func (c *CuckooFilter) components(data []byte) (uint, uint, []byte) {
var (
hash = c.computeHash(data)
f = hash[0:c.f]
i1 = uint(binary.BigEndian.Uint32(hash))
i2 = i1 ^ uint(binary.BigEndian.Uint32(c.computeHash(f)))
)
return i1, i2, f
}
// computeHash returns a 32-bit hash value for the given data.
func (c *CuckooFilter) computeHash(data []byte) []byte {
c.hash.Write(data)
hash := c.hash.Sum(nil)
c.hash.Reset()
return hash
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (c *CuckooFilter) SetHash(h hash.Hash32) {
c.hash = h
}
// calculateF returns the optimal fingerprint length in bytes for the given
// bucket size and false-positive rate epsilon.
func calculateF(b uint, epsilon float64) uint {
f := uint(math.Ceil(math.Log(2 * float64(b) / epsilon)))
f = f / 8
if f <= 0 {
f = 1
}
return f
}
// power2 calculates the next power of two for the given value.
func power2(x uint) uint {
x--
x |= x >> 1
x |= x >> 2
x |= x >> 4
x |= x >> 8
x |= x >> 16
x |= x >> 32
x++
return x
}

@ -0,0 +1,168 @@
package boom
import (
"hash"
"hash/fnv"
)
// DeletableBloomFilter implements a Deletable Bloom Filter as described by
// Rothenberg, Macapuna, Verdi, Magalhaes in The Deletable Bloom filter - A new
// member of the Bloom family:
//
// http://arxiv.org/pdf/1005.0352.pdf
//
// A Deletable Bloom Filter compactly stores information on collisions when
// inserting elements. This information is used to determine if elements are
// deletable. This design enables false-negative-free deletions at a fraction
// of the cost in memory consumption.
//
// Deletable Bloom Filters are useful for cases which require removing elements
// but cannot allow false negatives. This means they can be safely swapped in
// place of traditional Bloom filters.
type DeletableBloomFilter struct {
buckets *Buckets // filter data
collisions *Buckets // filter collision data
hash hash.Hash64 // hash function (kernel for all k functions)
m uint // filter size
regionSize uint // number of bits in a region
k uint // number of hash functions
count uint // number of items added
indexBuffer []uint // buffer used to cache indices
}
// NewDeletableBloomFilter creates a new DeletableBloomFilter optimized to
// store n items with a specified target false-positive rate. The r value
// determines the number of bits to use to store collision information. This
// controls the deletability of an element. Refer to the paper for selecting an
// optimal value.
func NewDeletableBloomFilter(n, r uint, fpRate float64) *DeletableBloomFilter {
var (
m = OptimalM(n, fpRate)
k = OptimalK(fpRate)
)
return &DeletableBloomFilter{
buckets: NewBuckets(m-r, 1),
collisions: NewBuckets(r+1, 1),
hash: fnv.New64(),
m: m - r,
regionSize: (m - r) / r,
k: k,
indexBuffer: make([]uint, k),
}
}
// Capacity returns the Bloom filter capacity, m.
func (d *DeletableBloomFilter) Capacity() uint {
return d.m
}
// K returns the number of hash functions.
func (d *DeletableBloomFilter) K() uint {
return d.k
}
// Count returns the number of items added to the filter.
func (d *DeletableBloomFilter) Count() uint {
return d.count
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives but a zero probability of false
// negatives.
func (d *DeletableBloomFilter) Test(data []byte) bool {
lower, upper := hashKernel(data, d.hash)
// If any of the K bits are not set, then it's not a member.
for i := uint(0); i < d.k; i++ {
if d.buckets.Get((uint(lower)+uint(upper)*i)%d.m) == 0 {
return false
}
}
return true
}
// Add will add the data to the Bloom filter. It returns the filter to allow
// for chaining.
func (d *DeletableBloomFilter) Add(data []byte) Filter {
lower, upper := hashKernel(data, d.hash)
// Set the K bits.
for i := uint(0); i < d.k; i++ {
idx := (uint(lower) + uint(upper)*i) % d.m
if d.buckets.Get(idx) != 0 {
// Collision, set corresponding region bit.
d.collisions.Set(idx/d.regionSize, 1)
} else {
d.buckets.Set(idx, 1)
}
}
d.count++
return d
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not.
func (d *DeletableBloomFilter) TestAndAdd(data []byte) bool {
lower, upper := hashKernel(data, d.hash)
member := true
// If any of the K bits are not set, then it's not a member.
for i := uint(0); i < d.k; i++ {
idx := (uint(lower) + uint(upper)*i) % d.m
if d.buckets.Get(idx) == 0 {
member = false
} else {
// Collision, set corresponding region bit.
d.collisions.Set(idx/d.regionSize, 1)
}
d.buckets.Set(idx, 1)
}
d.count++
return member
}
// TestAndRemove will test for membership of the data and remove it from the
// filter if it exists. Returns true if the data was a member, false if not.
func (d *DeletableBloomFilter) TestAndRemove(data []byte) bool {
lower, upper := hashKernel(data, d.hash)
member := true
// Set the K bits.
for i := uint(0); i < d.k; i++ {
d.indexBuffer[i] = (uint(lower) + uint(upper)*i) % d.m
if d.buckets.Get(d.indexBuffer[i]) == 0 {
member = false
}
}
if member {
for _, idx := range d.indexBuffer {
if d.collisions.Get(idx/d.regionSize) == 0 {
// Clear only bits located in collision-free zones.
d.buckets.Set(idx, 0)
}
}
d.count--
}
return member
}
// Reset restores the Bloom filter to its original state. It returns the filter
// to allow for chaining.
func (d *DeletableBloomFilter) Reset() *DeletableBloomFilter {
d.buckets.Reset()
d.collisions.Reset()
d.count = 0
return d
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (d *DeletableBloomFilter) SetHash(h hash.Hash64) {
d.hash = h
}

@ -0,0 +1,253 @@
/*
Original work Copyright 2013 Eric Lesh
Modified work Copyright 2015 Tyler Treat
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
*/
package boom
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash"
"hash/fnv"
"io"
"math"
)
var exp32 = math.Pow(2, 32)
// HyperLogLog implements the HyperLogLog cardinality estimation algorithm as
// described by Flajolet, Fusy, Gandouet, and Meunier in HyperLogLog: the
// analysis of a near-optimal cardinality estimation algorithm:
//
// http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
//
// HyperLogLog is a probabilistic algorithm which approximates the number of
// distinct elements in a multiset. It works by hashing values and calculating
// the maximum number of leading zeros in the binary representation of each
// hash. If the maximum number of leading zeros is n, the estimated number of
// distinct elements in the set is 2^n. To minimize variance, the multiset is
// split into a configurable number of registers, the maximum number of leading
// zeros is calculated in the numbers in each register, and a harmonic mean is
// used to combine the estimates.
//
// For large or unbounded data sets, calculating the exact cardinality is
// impractical. HyperLogLog uses a fraction of the memory while providing an
// accurate approximation. For counting element frequency, refer to the
// Count-Min Sketch.
type HyperLogLog struct {
registers []uint8 // counter registers
m uint // number of registers
b uint32 // number of bits to calculate register
alpha float64 // bias-correction constant
hash hash.Hash32 // hash function
}
// NewHyperLogLog creates a new HyperLogLog with m registers. Returns an error
// if m isn't a power of two.
func NewHyperLogLog(m uint) (*HyperLogLog, error) {
if (m & (m - 1)) != 0 {
return nil, errors.New("m must be a power of two")
}
return &HyperLogLog{
registers: make([]uint8, m),
m: m,
b: uint32(math.Ceil(math.Log2(float64(m)))),
alpha: calculateAlpha(m),
hash: fnv.New32(),
}, nil
}
// NewDefaultHyperLogLog creates a new HyperLogLog optimized for the specified
// standard error. Returns an error if the number of registers can't be
// calculated for the provided accuracy.
func NewDefaultHyperLogLog(e float64) (*HyperLogLog, error) {
m := math.Pow(1.04/e, 2)
return NewHyperLogLog(uint(math.Pow(2, math.Ceil(math.Log2(m)))))
}
// Add will add the data to the set. Returns the HyperLogLog to allow for
// chaining.
func (h *HyperLogLog) Add(data []byte) *HyperLogLog {
var (
hash = h.calculateHash(data)
k = 32 - h.b
r = calculateRho(hash<<h.b, k)
j = hash >> uint(k)
)
if r > h.registers[j] {
h.registers[j] = r
}
return h
}
// Count returns the approximated cardinality of the set.
func (h *HyperLogLog) Count() uint64 {
sum := 0.0
m := float64(h.m)
for _, val := range h.registers {
sum += 1.0 / math.Pow(2.0, float64(val))
}
estimate := h.alpha * m * m / sum
if estimate <= 5.0/2.0*m {
// Small range correction
v := 0
for _, r := range h.registers {
if r == 0 {
v++
}
}
if v > 0 {
estimate = m * math.Log(m/float64(v))
}
} else if estimate > 1.0/30.0*exp32 {
// Large range correction
estimate = -exp32 * math.Log(1-estimate/exp32)
}
return uint64(estimate)
}
// Merge combines this HyperLogLog with another. Returns an error if the number
// of registers in the two HyperLogLogs are not equal.
func (h *HyperLogLog) Merge(other *HyperLogLog) error {
if h.m != other.m {
return errors.New("number of registers must match")
}
for j, r := range other.registers {
if r > h.registers[j] {
h.registers[j] = r
}
}
return nil
}
// Reset restores the HyperLogLog to its original state. It returns itself to
// allow for chaining.
func (h *HyperLogLog) Reset() *HyperLogLog {
h.registers = make([]uint8, h.m)
return h
}
// calculateHash calculates the 32-bit hash value for the provided data.
func (h *HyperLogLog) calculateHash(data []byte) uint32 {
h.hash.Write(data)
sum := h.hash.Sum32()
h.hash.Reset()
return sum
}
// SetHash sets the hashing function used.
func (h *HyperLogLog) SetHash(ha hash.Hash32) {
h.hash = ha
}
// calculateAlpha calculates the bias-correction constant alpha based on the
// number of registers, m.
func calculateAlpha(m uint) (result float64) {
switch m {
case 16:
result = 0.673
case 32:
result = 0.697
case 64:
result = 0.709
default:
result = 0.7213 / (1.0 + 1.079/float64(m))
}
return result
}
// calculateRho calculates the position of the leftmost 1-bit.
func calculateRho(val, max uint32) uint8 {
r := uint32(1)
for val&0x80000000 == 0 && r <= max {
r++
val <<= 1
}
return uint8(r)
}
// WriteDataTo writes a binary representation of the Hll data to
// an io stream. It returns the number of bytes written and error
func (h *HyperLogLog) WriteDataTo(stream io.Writer) (n int, err error) {
buf := new(bytes.Buffer)
// write register number first
err = binary.Write(buf, binary.LittleEndian, uint64(h.m))
if err != nil {
return
}
err = binary.Write(buf, binary.LittleEndian, h.b)
if err != nil {
return
}
err = binary.Write(buf, binary.LittleEndian, h.alpha)
if err != nil {
return
}
err = binary.Write(buf, binary.LittleEndian, h.registers)
if err != nil {
return
}
n, err = stream.Write(buf.Bytes())
return
}
// ReadDataFrom reads a binary representation of the Hll data written
// by WriteDataTo() from io stream. It returns the number of bytes read
// and error.
// If serialized Hll configuration is different it returns error with expected params
func (h *HyperLogLog) ReadDataFrom(stream io.Reader) (int, error) {
var m uint64
// read register number first
err := binary.Read(stream, binary.LittleEndian, &m)
if err != nil {
return 0, err
}
// check if register number is appropriate
// hll register number should be same with serialized hll
if uint64(h.m) != m {
return 0, fmt.Errorf("expected hll register number %d", m)
}
// set other values
err = binary.Read(stream, binary.LittleEndian, &h.b)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.LittleEndian, &h.alpha)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.LittleEndian, h.registers)
if err != nil {
return 0, err
}
// count size of data in registers + m, b, alpha
size := int(h.m)*binary.Size(uint8(0)) + binary.Size(uint64(0)) + binary.Size(uint32(0)) + binary.Size(float64(0))
return size, err
}

@ -0,0 +1,269 @@
/*
Original work Copyright (c) 2012 Jeff Hodges. All rights reserved.
Modified work Copyright (c) 2015 Tyler Treat. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Jeff Hodges nor the names of this project's
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package boom
import (
"bytes"
"encoding/binary"
"encoding/gob"
"hash"
"hash/fnv"
"io"
"sync"
"sync/atomic"
"unsafe"
)
// InverseBloomFilter is a concurrent "inverse" Bloom filter, which is
// effectively the opposite of a classic Bloom filter. This was originally
// described and written by Jeff Hodges:
//
// http://www.somethingsimilar.com/2012/05/21/the-opposite-of-a-bloom-filter/
//
// The InverseBloomFilter may report a false negative but can never report a
// false positive. That is, it may report that an item has not been seen when
// it actually has, but it will never report an item as seen which it hasn't
// come across. This behaves in a similar manner to a fixed-size hashmap which
// does not handle conflicts.
//
// An example use case is deduplicating events while processing a stream of
// data. Ideally, duplicate events are relatively close together.
type InverseBloomFilter struct {
array []*[]byte
hashPool *sync.Pool
capacity uint
}
// NewInverseBloomFilter creates and returns a new InverseBloomFilter with the
// specified capacity.
func NewInverseBloomFilter(capacity uint) *InverseBloomFilter {
return &InverseBloomFilter{
array: make([]*[]byte, capacity),
hashPool: &sync.Pool{New: func() interface{} { return fnv.New32() }},
capacity: capacity,
}
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false negatives but a zero probability of false
// positives. That is, it may return false even though the data was added, but
// it will never return true for data that hasn't been added.
func (i *InverseBloomFilter) Test(data []byte) bool {
index := i.index(data)
indexPtr := (*unsafe.Pointer)(unsafe.Pointer(&i.array[index]))
val := (*[]byte)(atomic.LoadPointer(indexPtr))
if val == nil {
return false
}
return bytes.Equal(*val, data)
}
// Add will add the data to the filter. It returns the filter to allow for
// chaining.
func (i *InverseBloomFilter) Add(data []byte) Filter {
index := i.index(data)
i.getAndSet(index, data)
return i
}
// TestAndAdd is equivalent to calling Test followed by Add atomically. It
// returns true if the data is a member, false if not.
func (i *InverseBloomFilter) TestAndAdd(data []byte) bool {
oldID := i.getAndSet(i.index(data), data)
return bytes.Equal(oldID, data)
}
// Capacity returns the filter capacity.
func (i *InverseBloomFilter) Capacity() uint {
return i.capacity
}
// getAndSet returns the data that was in the slice at the given index after
// putting the new data in the slice at that index, atomically.
func (i *InverseBloomFilter) getAndSet(index uint32, data []byte) []byte {
indexPtr := (*unsafe.Pointer)(unsafe.Pointer(&i.array[index]))
keyUnsafe := unsafe.Pointer(&data)
var oldKey []byte
for {
oldKeyUnsafe := atomic.LoadPointer(indexPtr)
if atomic.CompareAndSwapPointer(indexPtr, oldKeyUnsafe, keyUnsafe) {
oldKeyPtr := (*[]byte)(oldKeyUnsafe)
if oldKeyPtr != nil {
oldKey = *oldKeyPtr
}
break
}
}
return oldKey
}
// index returns the array index for the given data.
func (i *InverseBloomFilter) index(data []byte) uint32 {
hash := i.hashPool.Get().(hash.Hash32)
hash.Write(data)
index := hash.Sum32() % uint32(i.capacity)
hash.Reset()
i.hashPool.Put(hash)
return index
}
// SetHashFactory sets the hashing function factory used in the filter.
func (i *InverseBloomFilter) SetHashFactory(h func() hash.Hash32) {
i.hashPool = &sync.Pool{New: func() interface{} { return h() }}
}
// WriteTo writes a binary representation of the InverseBloomFilter to an i/o stream.
// It returns the number of bytes written.
func (i *InverseBloomFilter) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, uint64(i.capacity))
if err != nil {
return 0, err
}
// Dereference all pointers to []byte
array := make([][]byte, int(i.capacity))
for b := range i.array {
if i.array[b] != nil {
array[b] = *i.array[b]
} else {
array[b] = nil
}
}
// Encode array into a []byte
var buf bytes.Buffer
gob.NewEncoder(&buf).Encode(array)
serialized := buf.Bytes()
// Write the length of encoded slice
err = binary.Write(stream, binary.BigEndian, int64(len(serialized)))
if err != nil {
return 0, err
}
// Write the serialized bytes
written, err := stream.Write(serialized)
if err != nil {
return 0, err
}
return int64(written) + int64(2*binary.Size(uint64(0))), err
}
// ReadFrom reads a binary representation of InverseBloomFilter (such as might
// have been written by WriteTo()) from an i/o stream. ReadFrom replaces the
// array of its filter with the one read from disk. It returns the number
// of bytes read.
func (i *InverseBloomFilter) ReadFrom(stream io.Reader) (int64, error) {
decoded, capacity, size, err := i.decodeToArray(stream)
if err != nil {
return int64(0), err
}
// Create []*[]byte and point to each item in decoded
decodedWithPointers := make([]*[]byte, capacity)
for p := range decodedWithPointers {
if len(decoded[p]) == 0 {
decodedWithPointers[p] = nil
} else {
decodedWithPointers[p] = &decoded[p]
}
}
i.array = decodedWithPointers
i.capacity = uint(capacity)
return int64(size) + int64(2*binary.Size(uint64(0))), nil
}
// ImportElementsFrom reads a binary representation of InverseBloomFilter (such as might
// have been written by WriteTo()) from an i/o stream into a new bloom filter using the
// Add() method (skipping empty elements, if any). It returns the number of
// elements decoded from disk.
func (i *InverseBloomFilter) ImportElementsFrom(stream io.Reader) (int, error) {
decoded, _, _, err := i.decodeToArray(stream)
if err != nil {
return 0, err
}
// Create []*[]byte and point to each item in decoded
for p := range decoded {
if len(decoded[p]) > 0 {
i.Add(decoded[p])
}
}
return len(decoded), nil
}
// decodeToArray decodes an inverse bloom filter from an i/o stream into a 2-d byte slice.
func (i *InverseBloomFilter) decodeToArray(stream io.Reader) ([][]byte, uint64, uint64, error) {
var capacity, size uint64
err := binary.Read(stream, binary.BigEndian, &capacity)
if err != nil {
return nil, 0, 0, err
}
err = binary.Read(stream, binary.BigEndian, &size)
if err != nil {
return nil, 0, 0, err
}
// Read the encoded slice and decode into [][]byte
encoded := make([]byte, size)
stream.Read(encoded)
buf := bytes.NewBuffer(encoded)
dec := gob.NewDecoder(buf)
decoded := make([][]byte, capacity)
dec.Decode(&decoded)
return decoded, capacity, size, nil
}
// GobEncode implements gob.GobEncoder interface.
func (i *InverseBloomFilter) GobEncode() ([]byte, error) {
var buf bytes.Buffer
_, err := i.WriteTo(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode implements gob.GobDecoder interface.
func (i *InverseBloomFilter) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
_, err := i.ReadFrom(buf)
return err
}

@ -0,0 +1,104 @@
package boom
import (
"math"
"math/rand"
)
// MinHash is a variation of the technique for estimating similarity between
// two sets as presented by Broder in On the resemblance and containment of
// documents:
//
// http://gatekeeper.dec.com/ftp/pub/dec/SRC/publications/broder/positano-final-wpnums.pdf
//
// This can be used to cluster or compare documents by splitting the corpus
// into a bag of words. MinHash returns the approximated similarity ratio of
// the two bags. The similarity is less accurate for very small bags of words.
func MinHash(bag1, bag2 []string) float32 {
k := len(bag1) + len(bag2)
hashes := make([]int, k)
for i := 0; i < k; i++ {
a := uint(rand.Int())
b := uint(rand.Int())
c := uint(rand.Int())
x := computeHash(a*b*c, a, b, c)
hashes[i] = int(x)
}
bitMap := bitMap(bag1, bag2)
minHashValues := hashBuckets(2, k)
minHash(bag1, 0, minHashValues, bitMap, k, hashes)
minHash(bag2, 1, minHashValues, bitMap, k, hashes)
return similarity(minHashValues, k)
}
func minHash(bag []string, bagIndex int, minHashValues [][]int,
bitArray map[string][]bool, k int, hashes []int) {
index := 0
for element := range bitArray {
for i := 0; i < k; i++ {
if contains(bag, element) {
hindex := hashes[index]
if hindex < minHashValues[bagIndex][index] {
minHashValues[bagIndex][index] = hindex
}
}
}
index++
}
}
func contains(bag []string, element string) bool {
for _, e := range bag {
if e == element {
return true
}
}
return false
}
func bitMap(bag1, bag2 []string) map[string][]bool {
bitArray := map[string][]bool{}
for _, element := range bag1 {
bitArray[element] = []bool{true, false}
}
for _, element := range bag2 {
if _, ok := bitArray[element]; ok {
bitArray[element] = []bool{true, true}
} else if _, ok := bitArray[element]; !ok {
bitArray[element] = []bool{false, true}
}
}
return bitArray
}
func hashBuckets(numSets, k int) [][]int {
minHashValues := make([][]int, numSets)
for i := 0; i < numSets; i++ {
minHashValues[i] = make([]int, k)
}
for i := 0; i < numSets; i++ {
for j := 0; j < k; j++ {
minHashValues[i][j] = math.MaxInt32
}
}
return minHashValues
}
func computeHash(x, a, b, u uint) uint {
return (a*x + b) >> (32 - u)
}
func similarity(minHashValues [][]int, k int) float32 {
identicalMinHashes := 0
for i := 0; i < k; i++ {
if minHashValues[0][i] == minHashValues[1][i] {
identicalMinHashes++
}
}
return (1.0 * float32(identicalMinHashes)) / float32(k)
}

@ -0,0 +1,300 @@
/*
Original work Copyright (c) 2013 zhenjl
Modified work Copyright (c) 2015 Tyler Treat
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
*/
package boom
import (
"bytes"
"encoding/binary"
"hash"
"hash/fnv"
"io"
"math"
)
// PartitionedBloomFilter implements a variation of a classic Bloom filter as
// described by Almeida, Baquero, Preguica, and Hutchison in Scalable Bloom
// Filters:
//
// http://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
//
// This filter works by partitioning the M-sized bit array into k slices of
// size m = M/k bits. Each hash function produces an index over m for its
// respective slice. Thus, each element is described by exactly k bits, meaning
// the distribution of false positives is uniform across all elements.
type PartitionedBloomFilter struct {
partitions []*Buckets // partitioned filter data
hash hash.Hash64 // hash function (kernel for all k functions)
m uint // filter size (divided into k partitions)
k uint // number of hash functions (and partitions)
s uint // partition size (m / k)
estimatedCount uint // number of distinct items added
optimalCount uint // optimal number of distinct items that can be stored in this filter
}
// NewPartitionedBloomFilterWithEstimates creates a new partitioned Bloom filter
// with a specific capacity
func NewPartitionedBloomFilterWithCapacity(m uint, fpRate float64) *PartitionedBloomFilter {
var (
k = OptimalK(fpRate)
s = uint(math.Ceil(float64(m) / float64(k)))
)
partitions := make([]*Buckets, k)
for i := uint(0); i < k; i++ {
partitions[i] = NewBuckets(s, 1)
}
return &PartitionedBloomFilter{
partitions: partitions,
hash: fnv.New64(),
m: m,
k: k,
s: s,
optimalCount: estimatedCount(m, fillRatio),
}
}
// NewPartitionedBloomFilter creates a new partitioned Bloom filter optimized
// to store n items with a specified target false-positive rate.
func NewPartitionedBloomFilter(n uint, fpRate float64) *PartitionedBloomFilter {
m := OptimalM(n, fpRate)
return NewPartitionedBloomFilterWithCapacity(m, fpRate)
}
// Capacity returns the Bloom filter capacity, m.
func (p *PartitionedBloomFilter) Capacity() uint {
return p.m
}
// K returns the number of hash functions.
func (p *PartitionedBloomFilter) K() uint {
return p.k
}
// Count returns the number of items added to the filter.
func (p *PartitionedBloomFilter) Count() uint {
return p.estimatedCount
}
// EstimatedFillRatio returns the current estimated ratio of set bits.
func (p *PartitionedBloomFilter) EstimatedFillRatio() float64 {
return 1 - math.Exp(-float64(p.estimatedCount)/float64(p.s))
}
// FillRatio returns the average ratio of set bits across all partitions.
func (p *PartitionedBloomFilter) FillRatio() float64 {
t := float64(0)
for i := uint(0); i < p.k; i++ {
var sum int
sum += p.partitions[i].PopCount()
t += (float64(sum) / float64(p.s))
}
return t / float64(p.k)
}
// Since duplicates can be added to a bloom filter,
// we update the count via the following formula via
// https://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
//
// n ≈ −m ln(1 − p).
// This prevents the count from being off by a large
// amount when duplicates are added.
// NOTE: this calls FillRatio which calculates the hamming weight from
// across all bits which can be relatively expensive.
// Returns current exact fill ratio
func (p *PartitionedBloomFilter) UpdateCount() float64 {
fillRatio := p.FillRatio()
p.estimatedCount = estimatedCount(p.m, fillRatio)
return fillRatio
}
// OptimalCount returns the optimal number of distinct items that can be stored
// in this filter.
func (p *PartitionedBloomFilter) OptimalCount() uint {
return p.optimalCount
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives but a zero probability of false
// negatives. Due to the way the filter is partitioned, the probability of
// false positives is uniformly distributed across all elements.
func (p *PartitionedBloomFilter) Test(data []byte) bool {
lower, upper := hashKernel(data, p.hash)
// If any of the K partition bits are not set, then it's not a member.
for i := uint(0); i < p.k; i++ {
if p.partitions[i].Get((uint(lower)+uint(upper)*i)%p.s) == 0 {
return false
}
}
return true
}
// Add will add the data to the Bloom filter. It returns the filter to allow
// for chaining.
func (p *PartitionedBloomFilter) Add(data []byte) Filter {
lower, upper := hashKernel(data, p.hash)
// Set the K partition bits.
for i := uint(0); i < p.k; i++ {
p.partitions[i].Set((uint(lower)+uint(upper)*i)%p.s, 1)
}
p.estimatedCount++
return p
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not.
func (p *PartitionedBloomFilter) TestAndAdd(data []byte) bool {
lower, upper := hashKernel(data, p.hash)
member := true
// If any of the K partition bits are not set, then it's not a member.
for i := uint(0); i < p.k; i++ {
idx := (uint(lower) + uint(upper)*i) % p.s
if p.partitions[i].Get(idx) == 0 {
member = false
}
p.partitions[i].Set(idx, 1)
}
p.estimatedCount++
return member
}
// Reset restores the Bloom filter to its original state. It returns the filter
// to allow for chaining.
func (p *PartitionedBloomFilter) Reset() *PartitionedBloomFilter {
for _, partition := range p.partitions {
partition.Reset()
}
return p
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (p *PartitionedBloomFilter) SetHash(h hash.Hash64) {
p.hash = h
}
// WriteTo writes a binary representation of the PartitionedBloomFilter to an i/o stream.
// It returns the number of bytes written.
func (p *PartitionedBloomFilter) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, uint64(p.m))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(p.k))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(p.s))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(p.estimatedCount))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(p.optimalCount))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(len(p.partitions)))
if err != nil {
return 0, err
}
var numBytes int64
for _, partition := range p.partitions {
num, err := partition.WriteTo(stream)
if err != nil {
return 0, err
}
numBytes += num
}
return numBytes + int64(5*binary.Size(uint64(0))), err
}
// ReadFrom reads a binary representation of PartitionedBloomFilter (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (p *PartitionedBloomFilter) ReadFrom(stream io.Reader) (int64, error) {
var m, k, s, estimatedCount, optimalCount, len uint64
err := binary.Read(stream, binary.BigEndian, &m)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &k)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &s)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &estimatedCount)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &optimalCount)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &len)
if err != nil {
return 0, err
}
var numBytes int64
partitions := make([]*Buckets, len)
for i := range partitions {
buckets := &Buckets{}
num, err := buckets.ReadFrom(stream)
if err != nil {
return 0, err
}
numBytes += num
partitions[i] = buckets
}
p.m = uint(m)
p.k = uint(k)
p.s = uint(s)
p.estimatedCount = uint(estimatedCount)
p.optimalCount = uint(optimalCount)
p.partitions = partitions
return numBytes + int64(5*binary.Size(uint64(0))), nil
}
// GobEncode implements gob.GobEncoder interface.
func (p *PartitionedBloomFilter) GobEncode() ([]byte, error) {
var buf bytes.Buffer
_, err := p.WriteTo(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode implements gob.GobDecoder interface.
func (p *PartitionedBloomFilter) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
_, err := p.ReadFrom(buf)
return err
}

@ -0,0 +1,326 @@
/*
Original work Copyright (c) 2013 zhenjl
Modified work Copyright (c) 2015 Tyler Treat
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
*/
package boom
import (
"bytes"
"encoding/binary"
"hash"
"io"
"math"
)
// ScalableBloomFilter implements a Scalable Bloom Filter as described by
// Almeida, Baquero, Preguica, and Hutchison in Scalable Bloom Filters:
//
// http://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
//
// A Scalable Bloom Filter dynamically adapts to the number of elements in the
// data set while enforcing a tight upper bound on the false-positive rate.
// This works by adding Bloom filters with geometrically decreasing
// false-positive rates as filters become full. The tightening ratio, r,
// controls the filter growth. The compounded probability over the whole series
// converges to a target value, even accounting for an infinite series.
//
// Scalable Bloom Filters are useful for cases where the size of the data set
// isn't known a priori and memory constraints aren't of particular concern.
// For situations where memory is bounded, consider using Inverse or Stable
// Bloom Filters.
type ScalableBloomFilter struct {
filters []*PartitionedBloomFilter // filters with geometrically decreasing error rates
r float64 // tightening ratio
fp float64 // target false-positive rate
p float64 // partition fill ratio
hint uint // filter size hint for first filter
s uint // space growth factor for successive filters. 2|4 recommended.
// number of additions since last fill ratio check,
// used to determine when to add a new filter.
// Since fill ratios are estimated based on number of additions
// and not actual fill ratio, this is used to amortize the cost
// of checking the fill ratio.
// Notably this is important when adding many duplicate keys to a filter
// which does not increase the number of set bits, but can artificially inflate the estimated fill ratio
// which tracks inserts.
// Reset on adding another filter
additionsSinceFillRatioCheck uint
}
const fillCheckFraction = 100
// NewScalableBloomFilter creates a new Scalable Bloom Filter with the
// specified target false-positive rate and tightening ratio. Use
// NewDefaultScalableBloomFilter if you don't want to calculate these
// parameters.
func NewScalableBloomFilter(hint uint, fpRate, r float64) *ScalableBloomFilter {
s := &ScalableBloomFilter{
filters: make([]*PartitionedBloomFilter, 0, 1),
r: r,
fp: fpRate,
p: fillRatio,
hint: hint,
s: 4,
}
s.addFilter()
return s
}
// NewDefaultScalableBloomFilter creates a new Scalable Bloom Filter with the
// specified target false-positive rate and an optimal tightening ratio.
func NewDefaultScalableBloomFilter(fpRate float64) *ScalableBloomFilter {
return NewScalableBloomFilter(10000, fpRate, 0.8)
}
// Capacity returns the current Scalable Bloom Filter capacity, which is the
// sum of the capacities for the contained series of Bloom filters.
func (s *ScalableBloomFilter) Capacity() uint {
capacity := uint(0)
for _, bf := range s.filters {
capacity += bf.Capacity()
}
return capacity
}
// K returns the number of hash functions used in each Bloom filter.
// Returns the highest value (the last filter)
func (s *ScalableBloomFilter) K() uint {
return s.filters[len(s.filters)-1].K()
}
// FillRatio returns the average ratio of set bits across every filter.
func (s *ScalableBloomFilter) FillRatio() float64 {
var sum, count float64
for _, filter := range s.filters {
capacity := filter.Capacity()
sum += filter.FillRatio() * float64(capacity)
count += float64(capacity)
}
return sum / count
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives but a zero probability of false
// negatives.
func (s *ScalableBloomFilter) Test(data []byte) bool {
// Querying is made by testing for the presence in each filter.
for _, bf := range s.filters {
if bf.Test(data) {
return true
}
}
return false
}
// Add will add the data to the Bloom filter. It returns the filter to allow
// for chaining.
func (s *ScalableBloomFilter) Add(data []byte) Filter {
idx := len(s.filters) - 1
// If the last filter has reached its fill ratio, add a new one.
// While the estimated fill ratio is cheap to calculate, it overestimates how full a filter
// may be because it doesn't account for duplicate key inserts.
// Therefore, use the estimated fill ratio to determine when to add a new filter, but
// throttle this by only checking the actual fill ratio when we've
// performed inserts greater than some fraction of the filter's optimal cardinality
// capacity since the last check.
// This prevents us from running expensive fill ratio checks too often on both ends:
// 1. When the filter is under utilized and the estimated fill ratio
// is below our target fill ratio
// 2. When the filter is close to it's target utilization, duplicates inserts
// will quickly inflate the estimated fill ratio. By throttling this check to
// every n inserts where n is some fraction of the total optimal key count,
// we can amortize the cost of the fill ratio check.
if s.filters[idx].EstimatedFillRatio() >= s.p && s.additionsSinceFillRatioCheck >= s.filters[idx].OptimalCount()/fillCheckFraction {
s.additionsSinceFillRatioCheck = 0
// calculate the actual fill ratio & update the estimated count for the filter. If the actual fill ratio
// is above the target fill ratio, add a new filter.
if ratio := s.filters[idx].UpdateCount(); ratio >= s.p {
s.addFilter()
idx++
}
}
s.filters[idx].Add(data)
s.additionsSinceFillRatioCheck++
return s
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not.
func (s *ScalableBloomFilter) TestAndAdd(data []byte) bool {
member := s.Test(data)
s.Add(data)
return member
}
// Reset restores the Bloom filter to its original state. It returns the filter
// to allow for chaining.
func (s *ScalableBloomFilter) Reset() *ScalableBloomFilter {
s.filters = make([]*PartitionedBloomFilter, 0, 1)
s.addFilter()
return s
}
// addFilter adds a new Bloom filter with a restricted false-positive rate to
// the Scalable Bloom Filter
func (s *ScalableBloomFilter) addFilter() {
fpRate := s.fp * math.Pow(s.r, float64(len(s.filters)))
var p *PartitionedBloomFilter
// first filter is created with a size determined by the hint.
// successive filters are created with a size determined by the
// previous filter's capacity and the space growth factor.
if len(s.filters) == 0 {
p = NewPartitionedBloomFilter(s.hint, fpRate)
} else {
p = NewPartitionedBloomFilterWithCapacity(s.filters[len(s.filters)-1].Capacity()*s.s, fpRate)
}
if len(s.filters) > 0 {
p.SetHash(s.filters[0].hash)
}
s.filters = append(s.filters, p)
s.additionsSinceFillRatioCheck = 0
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (s *ScalableBloomFilter) SetHash(h hash.Hash64) {
for _, bf := range s.filters {
bf.SetHash(h)
}
}
// WriteTo writes a binary representation of the ScalableBloomFilter to an i/o stream.
// It returns the number of bytes written.
func (s *ScalableBloomFilter) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, s.r)
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, s.fp)
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, s.p)
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(s.hint))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(s.s))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(s.additionsSinceFillRatioCheck))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(len(s.filters)))
if err != nil {
return 0, err
}
var numBytes int64
for _, filter := range s.filters {
num, err := filter.WriteTo(stream)
if err != nil {
return 0, err
}
numBytes += num
}
return numBytes + int64(5*binary.Size(uint64(0))), err
}
// ReadFrom reads a binary representation of ScalableBloomFilter (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (s *ScalableBloomFilter) ReadFrom(stream io.Reader) (int64, error) {
var r, fp, p float64
var hint, growthFactor, additions, len uint64
err := binary.Read(stream, binary.BigEndian, &r)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &fp)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &p)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &hint)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &growthFactor)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &additions)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &len)
if err != nil {
return 0, err
}
var numBytes int64
filters := make([]*PartitionedBloomFilter, len)
for i := range filters {
filter := NewPartitionedBloomFilter(0, fp)
num, err := filter.ReadFrom(stream)
if err != nil {
return 0, err
}
numBytes += num
filters[i] = filter
}
s.r = r
s.fp = fp
s.p = p
s.hint = uint(hint)
s.s = uint(growthFactor)
s.additionsSinceFillRatioCheck = uint(additions)
s.filters = filters
return numBytes + int64(5*binary.Size(uint64(0))), nil
}
// GobEncode implements gob.GobEncoder interface.
func (s *ScalableBloomFilter) GobEncode() ([]byte, error) {
var buf bytes.Buffer
_, err := s.WriteTo(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode implements gob.GobDecoder interface.
func (s *ScalableBloomFilter) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
_, err := s.ReadFrom(buf)
return err
}

@ -0,0 +1,333 @@
package boom
import (
"bytes"
"encoding/binary"
"hash"
"hash/fnv"
"io"
"math"
"math/rand"
)
// StableBloomFilter implements a Stable Bloom Filter as described by Deng and
// Rafiei in Approximately Detecting Duplicates for Streaming Data using Stable
// Bloom Filters:
//
// http://webdocs.cs.ualberta.ca/~drafiei/papers/DupDet06Sigmod.pdf
//
// A Stable Bloom Filter (SBF) continuously evicts stale information so that it
// has room for more recent elements. Like traditional Bloom filters, an SBF
// has a non-zero probability of false positives, which is controlled by
// several parameters. Unlike the classic Bloom filter, an SBF has a tight
// upper bound on the rate of false positives while introducing a non-zero rate
// of false negatives. The false-positive rate of a classic Bloom filter
// eventually reaches 1, after which all queries result in a false positive.
// The stable-point property of an SBF means the false-positive rate
// asymptotically approaches a configurable fixed constant. A classic Bloom
// filter is actually a special case of SBF where the eviction rate is zero, so
// this package provides support for them as well.
//
// Stable Bloom Filters are useful for cases where the size of the data set
// isn't known a priori, which is a requirement for traditional Bloom filters,
// and memory is bounded. For example, an SBF can be used to deduplicate
// events from an unbounded event stream with a specified upper bound on false
// positives and minimal false negatives.
type StableBloomFilter struct {
cells *Buckets // filter data
hash hash.Hash64 // hash function (kernel for all k functions)
m uint // number of cells
p uint // number of cells to decrement
k uint // number of hash functions
max uint8 // cell max value
indexBuffer []uint // buffer used to cache indices
}
// NewStableBloomFilter creates a new Stable Bloom Filter with m cells and d
// bits allocated per cell optimized for the target false-positive rate. Use
// NewDefaultStableFilter if you don't want to calculate d.
func NewStableBloomFilter(m uint, d uint8, fpRate float64) *StableBloomFilter {
k := OptimalK(fpRate) / 2
if k > m {
k = m
} else if k <= 0 {
k = 1
}
cells := NewBuckets(m, d)
return &StableBloomFilter{
hash: fnv.New64(),
m: m,
k: k,
p: optimalStableP(m, k, d, fpRate),
max: cells.MaxBucketValue(),
cells: cells,
indexBuffer: make([]uint, k),
}
}
// NewDefaultStableBloomFilter creates a new Stable Bloom Filter with m 1-bit
// cells and which is optimized for cases where there is no prior knowledge of
// the input data stream while maintaining an upper bound using the provided
// rate of false positives.
func NewDefaultStableBloomFilter(m uint, fpRate float64) *StableBloomFilter {
return NewStableBloomFilter(m, 1, fpRate)
}
// NewUnstableBloomFilter creates a new special case of Stable Bloom Filter
// which is a traditional Bloom filter with m bits and an optimal number of
// hash functions for the target false-positive rate. Unlike the stable
// variant, data is not evicted and a cell contains a maximum of 1 hash value.
func NewUnstableBloomFilter(m uint, fpRate float64) *StableBloomFilter {
var (
cells = NewBuckets(m, 1)
k = OptimalK(fpRate)
)
return &StableBloomFilter{
hash: fnv.New64(),
m: m,
k: k,
p: 0,
max: cells.MaxBucketValue(),
cells: cells,
indexBuffer: make([]uint, k),
}
}
// Cells returns the number of cells in the Stable Bloom Filter.
func (s *StableBloomFilter) Cells() uint {
return s.m
}
// K returns the number of hash functions.
func (s *StableBloomFilter) K() uint {
return s.k
}
// P returns the number of cells decremented on every add.
func (s *StableBloomFilter) P() uint {
return s.p
}
// StablePoint returns the limit of the expected fraction of zeros in the
// Stable Bloom Filter when the number of iterations goes to infinity. When
// this limit is reached, the Stable Bloom Filter is considered stable.
func (s *StableBloomFilter) StablePoint() float64 {
var (
subDenom = float64(s.p) * (1/float64(s.k) - 1/float64(s.m))
denom = 1 + 1/subDenom
base = 1 / denom
)
return math.Pow(base, float64(s.max))
}
// FalsePositiveRate returns the upper bound on false positives when the filter
// has become stable.
func (s *StableBloomFilter) FalsePositiveRate() float64 {
return math.Pow(1-s.StablePoint(), float64(s.k))
}
// Test will test for membership of the data and returns true if it is a
// member, false if not. This is a probabilistic test, meaning there is a
// non-zero probability of false positives and false negatives.
func (s *StableBloomFilter) Test(data []byte) bool {
lower, upper := hashKernel(data, s.hash)
// If any of the K cells are 0, then it's not a member.
for i := uint(0); i < s.k; i++ {
if s.cells.Get((uint(lower)+uint(upper)*i)%s.m) == 0 {
return false
}
}
return true
}
// Add will add the data to the Stable Bloom Filter. It returns the filter to
// allow for chaining.
func (s *StableBloomFilter) Add(data []byte) Filter {
// Randomly decrement p cells to make room for new elements.
s.decrement()
lower, upper := hashKernel(data, s.hash)
// Set the K cells to max.
for i := uint(0); i < s.k; i++ {
s.cells.Set((uint(lower)+uint(upper)*i)%s.m, s.max)
}
return s
}
// TestAndAdd is equivalent to calling Test followed by Add. It returns true if
// the data is a member, false if not.
func (s *StableBloomFilter) TestAndAdd(data []byte) bool {
lower, upper := hashKernel(data, s.hash)
member := true
// If any of the K cells are 0, then it's not a member.
for i := uint(0); i < s.k; i++ {
s.indexBuffer[i] = (uint(lower) + uint(upper)*i) % s.m
if s.cells.Get(s.indexBuffer[i]) == 0 {
member = false
}
}
// Randomly decrement p cells to make room for new elements.
s.decrement()
// Set the K cells to max.
for _, idx := range s.indexBuffer {
s.cells.Set(idx, s.max)
}
return member
}
// Reset restores the Stable Bloom Filter to its original state. It returns the
// filter to allow for chaining.
func (s *StableBloomFilter) Reset() *StableBloomFilter {
s.cells.Reset()
return s
}
// decrement will decrement a random cell and (p-1) adjacent cells by 1. This
// is faster than generating p random numbers. Although the processes of
// picking the p cells are not independent, each cell has a probability of p/m
// for being picked at each iteration, which means the properties still hold.
func (s *StableBloomFilter) decrement() {
r := rand.Intn(int(s.m))
for i := uint(0); i < s.p; i++ {
idx := (r + int(i)) % int(s.m)
s.cells.Increment(uint(idx), -1)
}
}
// SetHash sets the hashing function used in the filter.
// For the effect on false positive rates see: https://github.com/tylertreat/BoomFilters/pull/1
func (s *StableBloomFilter) SetHash(h hash.Hash64) {
s.hash = h
}
// WriteTo writes a binary representation of the StableBloomFilter to an i/o stream.
// It returns the number of bytes written.
func (s *StableBloomFilter) WriteTo(stream io.Writer) (int64, error) {
err := binary.Write(stream, binary.BigEndian, uint64(s.m))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(s.p))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, uint64(s.k))
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, s.max)
if err != nil {
return 0, err
}
err = binary.Write(stream, binary.BigEndian, int64(len(s.indexBuffer)))
if err != nil {
return 0, err
}
for _, index := range s.indexBuffer {
err = binary.Write(stream, binary.BigEndian, uint64(index))
if err != nil {
return 0, err
}
}
n, err := s.cells.WriteTo(stream)
if err != nil {
return 0, err
}
return int64((3+len(s.indexBuffer))*binary.Size(uint64(0))) +
int64(1*binary.Size(uint8(0))) + int64(1*binary.Size(int64(0))) + n, err
}
// ReadFrom reads a binary representation of StableBloomFilter (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (s *StableBloomFilter) ReadFrom(stream io.Reader) (int64, error) {
var m, p, k, bufferLen uint64
var max uint8
err := binary.Read(stream, binary.BigEndian, &m)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &p)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &k)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &max)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &bufferLen)
if err != nil {
return 0, err
}
indexBuffer := make([]uint, bufferLen)
var index uint64
for i := range indexBuffer {
err = binary.Read(stream, binary.BigEndian, &index)
if err != nil {
return 0, err
}
indexBuffer[i] = uint(index)
}
s.m = uint(m)
s.p = uint(p)
s.k = uint(k)
s.max = max
s.indexBuffer = indexBuffer
n, err := s.cells.ReadFrom(stream)
if err != nil {
return 0, err
}
return int64((3+len(s.indexBuffer))*binary.Size(uint64(0))) +
int64(1*binary.Size(uint8(0))) + int64(1*binary.Size(int64(0))) + n, nil
}
// GobEncode implements gob.GobEncoder interface.
func (s *StableBloomFilter) GobEncode() ([]byte, error) {
var buf bytes.Buffer
_, err := s.WriteTo(&buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// GobDecode implements gob.GobDecoder interface.
func (s *StableBloomFilter) GobDecode(data []byte) error {
buf := bytes.NewBuffer(data)
_, err := s.ReadFrom(buf)
return err
}
// optimalStableP returns the optimal number of cells to decrement, p, per
// iteration for the provided parameters of an SBF.
func optimalStableP(m, k uint, d uint8, fpRate float64) uint {
var (
max = math.Pow(2, float64(d)) - 1
subDenom = math.Pow(1-math.Pow(fpRate, 1/float64(k)), 1/max)
denom = (1/subDenom - 1) * (1/float64(k) - 1/float64(m))
)
p := uint(1 / denom)
if p <= 0 {
p = 1
}
return p
}

@ -0,0 +1,128 @@
package boom
import (
"bytes"
"container/heap"
)
// Element represents a data and it's frequency
type Element struct {
Data []byte
Freq uint64
}
// An elementHeap is a min-heap of elements.
type elementHeap []*Element
func (e elementHeap) Len() int { return len(e) }
func (e elementHeap) Less(i, j int) bool { return e[i].Freq < e[j].Freq }
func (e elementHeap) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
func (e *elementHeap) Push(x interface{}) {
*e = append(*e, x.(*Element))
}
func (e *elementHeap) Pop() interface{} {
old := *e
n := len(old)
x := old[n-1]
*e = old[0 : n-1]
return x
}
// TopK uses a Count-Min Sketch to calculate the top-K frequent elements in a
// stream.
type TopK struct {
cms *CountMinSketch
k uint
n uint
elements *elementHeap
}
// NewTopK creates a new TopK backed by a Count-Min sketch whose relative
// accuracy is within a factor of epsilon with probability delta. It tracks the
// k-most frequent elements.
func NewTopK(epsilon, delta float64, k uint) *TopK {
elements := make(elementHeap, 0, k)
heap.Init(&elements)
return &TopK{
cms: NewCountMinSketch(epsilon, delta),
k: k,
elements: &elements,
}
}
// Add will add the data to the Count-Min Sketch and update the top-k heap if
// applicable. Returns the TopK to allow for chaining.
func (t *TopK) Add(data []byte) *TopK {
t.cms.Add(data)
t.n++
freq := t.cms.Count(data)
if t.isTop(freq) {
t.insert(data, freq)
}
return t
}
// Elements returns the top-k elements from lowest to highest frequency.
func (t *TopK) Elements() []*Element {
if t.elements.Len() == 0 {
return make([]*Element, 0)
}
elements := make(elementHeap, t.elements.Len())
copy(elements, *t.elements)
heap.Init(&elements)
topK := make([]*Element, 0, t.k)
for elements.Len() > 0 {
topK = append(topK, heap.Pop(&elements).(*Element))
}
return topK
}
// Reset restores the TopK to its original state. It returns itself to allow
// for chaining.
func (t *TopK) Reset() *TopK {
t.cms.Reset()
elements := make(elementHeap, 0, t.k)
heap.Init(&elements)
t.elements = &elements
t.n = 0
return t
}
// isTop indicates if the given frequency falls within the top-k heap.
func (t *TopK) isTop(freq uint64) bool {
if t.elements.Len() < int(t.k) {
return true
}
return freq >= (*t.elements)[0].Freq
}
// insert adds the data to the top-k heap. If the data is already an element,
// the frequency is updated. If the heap already has k elements, the element
// with the minimum frequency is removed.
func (t *TopK) insert(data []byte, freq uint64) {
for i, element := range *t.elements {
if bytes.Equal(data, element.Data) {
// Element already in top-k, replace it with new frequency.
heap.Remove(t.elements, i)
element.Freq = freq
heap.Push(t.elements, element)
return
}
}
if t.elements.Len() == int(t.k) {
// Remove minimum-frequency element.
heap.Pop(t.elements)
}
// Add element to top-k.
heap.Push(t.elements, &Element{Data: data, Freq: freq})
}

@ -1172,6 +1172,9 @@ github.com/oschwald/geoip2-golang
# github.com/oschwald/maxminddb-golang v1.11.0
## explicit; go 1.19
github.com/oschwald/maxminddb-golang
# github.com/owen-d/BoomFilters v0.0.0-20230914145927-1ad00a0ec6fd
## explicit; go 1.20
github.com/owen-d/BoomFilters/boom
# github.com/pierrec/lz4/v4 v4.1.18
## explicit; go 1.14
github.com/pierrec/lz4/v4

Loading…
Cancel
Save