|
|
|
|
@ -9,15 +9,14 @@ import ( |
|
|
|
|
"path/filepath" |
|
|
|
|
"runtime" |
|
|
|
|
"runtime/pprof" |
|
|
|
|
"sort" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
"unsafe" |
|
|
|
|
|
|
|
|
|
"github.com/fabxc/tsdb" |
|
|
|
|
"github.com/fabxc/tsdb/labels" |
|
|
|
|
dto "github.com/prometheus/client_model/go" |
|
|
|
|
"github.com/prometheus/common/expfmt" |
|
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
promlabels "github.com/prometheus/prometheus/pkg/labels" |
|
|
|
|
"github.com/prometheus/prometheus/pkg/textparse" |
|
|
|
|
"github.com/spf13/cobra" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@ -97,7 +96,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { |
|
|
|
|
} |
|
|
|
|
b.storage = st |
|
|
|
|
|
|
|
|
|
var metrics []model.Metric |
|
|
|
|
var metrics []labels.Labels |
|
|
|
|
|
|
|
|
|
measureTime("readData", func() { |
|
|
|
|
f, err := os.Open(args[0]) |
|
|
|
|
@ -133,23 +132,11 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *writeBenchmark) ingestScrapes(metrics []model.Metric, scrapeCount int) error { |
|
|
|
|
func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) error { |
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
var mu sync.Mutex |
|
|
|
|
var total uint64 |
|
|
|
|
|
|
|
|
|
lbls := make([]labels.Labels, 0, len(metrics)) |
|
|
|
|
|
|
|
|
|
for _, m := range metrics { |
|
|
|
|
lset := make(labels.Labels, 0, len(m)) |
|
|
|
|
for k, v := range m { |
|
|
|
|
lset = append(lset, labels.Label{Name: string(k), Value: string(v)}) |
|
|
|
|
} |
|
|
|
|
sort.Sort(lset) |
|
|
|
|
|
|
|
|
|
lbls = append(lbls, lset) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for i := 0; i < scrapeCount; i += 50 { |
|
|
|
|
lbls := lbls |
|
|
|
|
for len(lbls) > 0 { |
|
|
|
|
@ -305,44 +292,30 @@ func measureTime(stage string, f func()) { |
|
|
|
|
fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func readPrometheusLabels(r io.Reader, n int) ([]model.Metric, error) { |
|
|
|
|
dec := expfmt.NewDecoder(r, expfmt.FmtProtoText) |
|
|
|
|
|
|
|
|
|
var mets []model.Metric |
|
|
|
|
fps := map[model.Fingerprint]struct{}{} |
|
|
|
|
var mf dto.MetricFamily |
|
|
|
|
var dups int |
|
|
|
|
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { |
|
|
|
|
b, err := ioutil.ReadAll(r) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for i := 0; i < n; { |
|
|
|
|
if err := dec.Decode(&mf); err != nil { |
|
|
|
|
if err == io.EOF { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
p := textparse.New(b) |
|
|
|
|
i := 0 |
|
|
|
|
var mets []labels.Labels |
|
|
|
|
hashes := map[uint64]struct{}{} |
|
|
|
|
|
|
|
|
|
for _, m := range mf.GetMetric() { |
|
|
|
|
met := make(model.Metric, len(m.GetLabel())+1) |
|
|
|
|
met["__name__"] = model.LabelValue(mf.GetName()) |
|
|
|
|
for p.Next() && i < n { |
|
|
|
|
m := make(labels.Labels, 0, 10) |
|
|
|
|
p.Metric((*promlabels.Labels)(unsafe.Pointer(&m))) |
|
|
|
|
|
|
|
|
|
for _, l := range m.GetLabel() { |
|
|
|
|
met[model.LabelName(l.GetName())] = model.LabelValue(l.GetValue()) |
|
|
|
|
} |
|
|
|
|
if _, ok := fps[met.Fingerprint()]; ok { |
|
|
|
|
dups++ |
|
|
|
|
} else { |
|
|
|
|
mets = append(mets, met) |
|
|
|
|
fps[met.Fingerprint()] = struct{}{} |
|
|
|
|
} |
|
|
|
|
i++ |
|
|
|
|
h := m.Hash() |
|
|
|
|
if _, ok := hashes[h]; ok { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
mets = append(mets, m) |
|
|
|
|
hashes[h] = struct{}{} |
|
|
|
|
i++ |
|
|
|
|
} |
|
|
|
|
if dups > 0 { |
|
|
|
|
fmt.Println("dropped duplicate metrics:", dups) |
|
|
|
|
} |
|
|
|
|
fmt.Println("read metrics", len(mets[:n])) |
|
|
|
|
|
|
|
|
|
return mets[:n], nil |
|
|
|
|
return mets, p.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func exitWithError(err error) { |
|
|
|
|
|