|
|
|
|
@ -29,20 +29,20 @@ import ( |
|
|
|
|
"sort" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type LevigoMetricPersistence struct { |
|
|
|
|
fingerprintHighWaterMarks *storage.LevigoPersistence |
|
|
|
|
fingerprintLabelPairs *storage.LevigoPersistence |
|
|
|
|
fingerprintLowWaterMarks *storage.LevigoPersistence |
|
|
|
|
fingerprintSamples *storage.LevigoPersistence |
|
|
|
|
labelNameFingerprints *storage.LevigoPersistence |
|
|
|
|
labelPairFingerprints *storage.LevigoPersistence |
|
|
|
|
metricMembershipIndex *index.LevigoMembershipIndex |
|
|
|
|
type LevelDBMetricPersistence struct { |
|
|
|
|
fingerprintHighWaterMarks *storage.LevelDBPersistence |
|
|
|
|
fingerprintLabelPairs *storage.LevelDBPersistence |
|
|
|
|
fingerprintLowWaterMarks *storage.LevelDBPersistence |
|
|
|
|
fingerprintSamples *storage.LevelDBPersistence |
|
|
|
|
labelNameFingerprints *storage.LevelDBPersistence |
|
|
|
|
labelPairFingerprints *storage.LevelDBPersistence |
|
|
|
|
metricMembershipIndex *index.LevelDBMembershipIndex |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type levigoOpener func() |
|
|
|
|
type leveldbOpener func() |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) Close() error { |
|
|
|
|
log.Printf("Closing LevigoPersistence storage containers...") |
|
|
|
|
func (l *LevelDBMetricPersistence) Close() error { |
|
|
|
|
log.Printf("Closing LevelDBPersistence storage containers...") |
|
|
|
|
|
|
|
|
|
var persistences = []struct { |
|
|
|
|
name string |
|
|
|
|
@ -85,11 +85,11 @@ func (l *LevigoMetricPersistence) Close() error { |
|
|
|
|
closer := persistence.closer |
|
|
|
|
|
|
|
|
|
if closer != nil { |
|
|
|
|
log.Printf("Closing LevigoPersistence storage container: %s\n", name) |
|
|
|
|
log.Printf("Closing LevelDBPersistence storage container: %s\n", name) |
|
|
|
|
closingError := closer.Close() |
|
|
|
|
|
|
|
|
|
if closingError != nil { |
|
|
|
|
log.Printf("Could not close a LevigoPersistence storage container; inconsistencies are possible: %q\n", closingError) |
|
|
|
|
log.Printf("Could not close a LevelDBPersistence storage container; inconsistencies are possible: %q\n", closingError) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
errorChannel <- closingError |
|
|
|
|
@ -106,27 +106,27 @@ func (l *LevigoMetricPersistence) Close() error { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
log.Printf("Successfully closed all LevigoPersistence storage containers.") |
|
|
|
|
log.Printf("Successfully closed all LevelDBPersistence storage containers.") |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, error) { |
|
|
|
|
log.Printf("Opening LevigoPersistence storage containers...") |
|
|
|
|
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { |
|
|
|
|
log.Printf("Opening LevelDBPersistence storage containers...") |
|
|
|
|
|
|
|
|
|
errorChannel := make(chan error, 7) |
|
|
|
|
|
|
|
|
|
emission := &LevigoMetricPersistence{} |
|
|
|
|
emission := &LevelDBMetricPersistence{} |
|
|
|
|
|
|
|
|
|
var subsystemOpeners = []struct { |
|
|
|
|
name string |
|
|
|
|
opener levigoOpener |
|
|
|
|
opener leveldbOpener |
|
|
|
|
}{ |
|
|
|
|
{ |
|
|
|
|
"High-Water Marks by Fingerprint", |
|
|
|
|
func() { |
|
|
|
|
var anomaly error |
|
|
|
|
emission.fingerprintHighWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10) |
|
|
|
|
emission.fingerprintHighWaterMarks, anomaly = storage.NewLevelDBPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10) |
|
|
|
|
errorChannel <- anomaly |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
@ -134,7 +134,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
"Label Names and Value Pairs by Fingerprint", |
|
|
|
|
func() { |
|
|
|
|
var anomaly error |
|
|
|
|
emission.fingerprintLabelPairs, anomaly = storage.NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) |
|
|
|
|
emission.fingerprintLabelPairs, anomaly = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) |
|
|
|
|
errorChannel <- anomaly |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
@ -142,7 +142,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
"Low-Water Marks by Fingerprint", |
|
|
|
|
func() { |
|
|
|
|
var anomaly error |
|
|
|
|
emission.fingerprintLowWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10) |
|
|
|
|
emission.fingerprintLowWaterMarks, anomaly = storage.NewLevelDBPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10) |
|
|
|
|
errorChannel <- anomaly |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
@ -150,7 +150,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
"Samples by Fingerprint", |
|
|
|
|
func() { |
|
|
|
|
var anomaly error |
|
|
|
|
emission.fingerprintSamples, anomaly = storage.NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) |
|
|
|
|
emission.fingerprintSamples, anomaly = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) |
|
|
|
|
errorChannel <- anomaly |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
@ -158,7 +158,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
"Fingerprints by Label Name", |
|
|
|
|
func() { |
|
|
|
|
var anomaly error |
|
|
|
|
emission.labelNameFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) |
|
|
|
|
emission.labelNameFingerprints, anomaly = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) |
|
|
|
|
errorChannel <- anomaly |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
@ -166,7 +166,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
"Fingerprints by Label Name and Value Pair", |
|
|
|
|
func() { |
|
|
|
|
var anomaly error |
|
|
|
|
emission.labelPairFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) |
|
|
|
|
emission.labelPairFingerprints, anomaly = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) |
|
|
|
|
errorChannel <- anomaly |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
@ -174,7 +174,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
"Metric Membership Index", |
|
|
|
|
func() { |
|
|
|
|
var anomaly error |
|
|
|
|
emission.metricMembershipIndex, anomaly = index.NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) |
|
|
|
|
emission.metricMembershipIndex, anomaly = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) |
|
|
|
|
errorChannel <- anomaly |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
@ -184,7 +184,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
name := subsystem.name |
|
|
|
|
opener := subsystem.opener |
|
|
|
|
|
|
|
|
|
log.Printf("Opening LevigoPersistence storage container: %s\n", name) |
|
|
|
|
log.Printf("Opening LevelDBPersistence storage container: %s\n", name) |
|
|
|
|
|
|
|
|
|
go opener() |
|
|
|
|
} |
|
|
|
|
@ -194,13 +194,13 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, |
|
|
|
|
|
|
|
|
|
if openingError != nil { |
|
|
|
|
|
|
|
|
|
log.Printf("Could not open a LevigoPersistence storage container: %q\n", openingError) |
|
|
|
|
log.Printf("Could not open a LevelDBPersistence storage container: %q\n", openingError) |
|
|
|
|
|
|
|
|
|
return nil, openingError |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
log.Printf("Successfully opened all LevigoPersistence storage containers.\n") |
|
|
|
|
log.Printf("Successfully opened all LevelDBPersistence storage containers.\n") |
|
|
|
|
|
|
|
|
|
return emission, nil |
|
|
|
|
} |
|
|
|
|
@ -269,12 +269,12 @@ func fingerprintDDOFromByteArray(fingerprint []byte) *data.FingerprintDDO { |
|
|
|
|
return fingerprintDDO |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) hasIndexMetric(ddo *data.MetricDDO) (bool, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) hasIndexMetric(ddo *data.MetricDDO) (bool, error) { |
|
|
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo) |
|
|
|
|
return l.metricMembershipIndex.Has(ddoKey) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) indexMetric(ddo *data.MetricDDO) error { |
|
|
|
|
func (l *LevelDBMetricPersistence) indexMetric(ddo *data.MetricDDO) error { |
|
|
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo) |
|
|
|
|
return l.metricMembershipIndex.Put(ddoKey) |
|
|
|
|
} |
|
|
|
|
@ -292,17 +292,17 @@ func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, erro |
|
|
|
|
return nil, errors.New("Unknown error in generating FingerprintDDO from message.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) HasLabelPair(ddo *data.LabelPairDDO) (bool, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) HasLabelPair(ddo *data.LabelPairDDO) (bool, error) { |
|
|
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo) |
|
|
|
|
return l.labelPairFingerprints.Has(ddoKey) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) HasLabelName(ddo *data.LabelNameDDO) (bool, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) HasLabelName(ddo *data.LabelNameDDO) (bool, error) { |
|
|
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo) |
|
|
|
|
return l.labelNameFingerprints.Has(ddoKey) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDDO) (*data.FingerprintCollectionDDO, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDDO) (*data.FingerprintCollectionDDO, error) { |
|
|
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo) |
|
|
|
|
if get, getError := l.labelPairFingerprints.Get(ddoKey); getError == nil { |
|
|
|
|
value := &data.FingerprintCollectionDDO{} |
|
|
|
|
@ -317,7 +317,7 @@ func (l *LevigoMetricPersistence) GetLabelPairFingerprints(ddo *data.LabelPairDD |
|
|
|
|
return nil, errors.New("Unknown error while getting label name and value pair fingerprints.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDDO) (*data.FingerprintCollectionDDO, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDDO) (*data.FingerprintCollectionDDO, error) { |
|
|
|
|
ddoKey := coding.NewProtocolBufferEncoder(ddo) |
|
|
|
|
if get, getError := l.labelNameFingerprints.Get(ddoKey); getError == nil { |
|
|
|
|
value := &data.FingerprintCollectionDDO{} |
|
|
|
|
@ -333,19 +333,19 @@ func (l *LevigoMetricPersistence) GetLabelNameFingerprints(ddo *data.LabelNameDD |
|
|
|
|
return nil, errors.New("Unknown error while getting label name fingerprints.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPairDDO, fingerprints *data.FingerprintCollectionDDO) error { |
|
|
|
|
func (l *LevelDBMetricPersistence) setLabelPairFingerprints(labelPair *data.LabelPairDDO, fingerprints *data.FingerprintCollectionDDO) error { |
|
|
|
|
labelPairEncoded := coding.NewProtocolBufferEncoder(labelPair) |
|
|
|
|
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) |
|
|
|
|
return l.labelPairFingerprints.Put(labelPairEncoded, fingerprintsEncoded) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) setLabelNameFingerprints(labelName *data.LabelNameDDO, fingerprints *data.FingerprintCollectionDDO) error { |
|
|
|
|
func (l *LevelDBMetricPersistence) setLabelNameFingerprints(labelName *data.LabelNameDDO, fingerprints *data.FingerprintCollectionDDO) error { |
|
|
|
|
labelNameEncoded := coding.NewProtocolBufferEncoder(labelName) |
|
|
|
|
fingerprintsEncoded := coding.NewProtocolBufferEncoder(fingerprints) |
|
|
|
|
return l.labelNameFingerprints.Put(labelNameEncoded, fingerprintsEncoded) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { |
|
|
|
|
func (l *LevelDBMetricPersistence) appendLabelPairFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { |
|
|
|
|
if has, hasError := l.HasLabelPair(labelPair); hasError == nil { |
|
|
|
|
var fingerprints *data.FingerprintCollectionDDO |
|
|
|
|
if has { |
|
|
|
|
@ -368,7 +368,7 @@ func (l *LevigoMetricPersistence) appendLabelPairFingerprint(labelPair *data.Lab |
|
|
|
|
return errors.New("Unknown error when appending fingerprint to label name and value pair.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { |
|
|
|
|
func (l *LevelDBMetricPersistence) appendLabelNameFingerprint(labelPair *data.LabelPairDDO, fingerprint *data.FingerprintDDO) error { |
|
|
|
|
labelName := &data.LabelNameDDO{ |
|
|
|
|
Name: labelPair.Name, |
|
|
|
|
} |
|
|
|
|
@ -395,7 +395,7 @@ func (l *LevigoMetricPersistence) appendLabelNameFingerprint(labelPair *data.Lab |
|
|
|
|
return errors.New("Unknown error when appending fingerprint to label name and value pair.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error { |
|
|
|
|
func (l *LevelDBMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error { |
|
|
|
|
if fingerprintDDO, fingerprintDDOError := fingerprintDDOForMessage(ddo); fingerprintDDOError == nil { |
|
|
|
|
labelPairCollectionDDO := &data.LabelPairCollectionDDO{ |
|
|
|
|
Member: ddo.LabelPair, |
|
|
|
|
@ -446,7 +446,7 @@ func (l *LevigoMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error |
|
|
|
|
return errors.New("Unknown error in appending label pairs to fingerprint.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) AppendSample(sample *model.Sample) error { |
|
|
|
|
func (l *LevelDBMetricPersistence) AppendSample(sample *model.Sample) error { |
|
|
|
|
fmt.Printf("Sample: %q\n", sample) |
|
|
|
|
|
|
|
|
|
metricDDO := ddoFromSample(sample) |
|
|
|
|
@ -494,7 +494,7 @@ func (l *LevigoMetricPersistence) AppendSample(sample *model.Sample) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelNames() ([]string, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) GetLabelNames() ([]string, error) { |
|
|
|
|
if getAll, getAllError := l.labelNameFingerprints.GetAll(); getAllError == nil { |
|
|
|
|
result := make([]string, 0, len(getAll)) |
|
|
|
|
labelNameDDO := &data.LabelNameDDO{} |
|
|
|
|
@ -516,7 +516,7 @@ func (l *LevigoMetricPersistence) GetLabelNames() ([]string, error) { |
|
|
|
|
return nil, errors.New("Unknown error encountered when querying label names.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) { |
|
|
|
|
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { |
|
|
|
|
result := make([]model.LabelPairs, 0, len(getAll)) |
|
|
|
|
labelPairDDO := &data.LabelPairDDO{} |
|
|
|
|
@ -541,7 +541,7 @@ func (l *LevigoMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) { |
|
|
|
|
return nil, errors.New("Unknown error encountered when querying label pairs.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { |
|
|
|
|
log.Printf("GetMetrics()\n") |
|
|
|
|
|
|
|
|
|
if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { |
|
|
|
|
@ -596,7 +596,7 @@ func (l *LevigoMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { |
|
|
|
|
return nil, errors.New("Unknown error encountered when querying metrics.") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric model.Metric) (*model.Interval, int, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) GetWatermarksForMetric(metric model.Metric) (*model.Interval, int, error) { |
|
|
|
|
metricDDO := ddoFromMetric(metric) |
|
|
|
|
|
|
|
|
|
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { |
|
|
|
|
@ -670,7 +670,7 @@ func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric model.Metric) (* |
|
|
|
|
|
|
|
|
|
// TODO(mtp): Holes in the data!
|
|
|
|
|
|
|
|
|
|
func (l *LevigoMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { |
|
|
|
|
func (l *LevelDBMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { |
|
|
|
|
metricDDO := ddoFromMetric(metric) |
|
|
|
|
|
|
|
|
|
if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { |
|
|
|
|
|