diff --git a/Makefile.TRAVIS b/Makefile.TRAVIS index 0441efda61..7640b7587c 100644 --- a/Makefile.TRAVIS +++ b/Makefile.TRAVIS @@ -109,7 +109,7 @@ goprotobuf-stamp: go protoc source instrumentation: instrumentation-stamp instrumentation-stamp: go source - $(GO_GET) github.com/matttproud/golang_instrumentation + $(GO_GET) github.com/prometheus/client_golang touch $@ leveldb: leveldb-stamp diff --git a/main.go b/main.go index 94f3c0cd6d..242de48233 100644 --- a/main.go +++ b/main.go @@ -16,13 +16,14 @@ package main import ( "code.google.com/p/gorest" "flag" - "github.com/matttproud/golang_instrumentation" "github.com/matttproud/prometheus/api" "github.com/matttproud/prometheus/config" "github.com/matttproud/prometheus/retrieval" + "github.com/matttproud/prometheus/retrieval/format" "github.com/matttproud/prometheus/rules" "github.com/matttproud/prometheus/rules/ast" "github.com/matttproud/prometheus/storage/metric/leveldb" + "github.com/prometheus/client_golang" "log" "net/http" "os" @@ -31,8 +32,11 @@ import ( // Commandline flags. var ( - configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.") - metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.") + configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.") + metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.") + scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.") + ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.") + concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") ) func main() { @@ -58,12 +62,13 @@ func main() { defer persistence.Close() - scrapeResults := make(chan retrieval.Result, 4096) + // Queue depth will need to be exposed + scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) - targetManager := retrieval.NewTargetManager(scrapeResults, 1) + targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) targetManager.AddTargetsFromConfig(conf) - ruleResults := make(chan *rules.Result, 4096) + ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) ast.SetPersistence(persistence) ruleManager := rules.NewRuleManager(ruleResults, conf.Global.EvaluationInterval) @@ -85,12 +90,10 @@ func main() { for { select { case scrapeResult := <-scrapeResults: - //fmt.Printf("scrapeResult -> %s\n", scrapeResult) - for _, sample := range scrapeResult.Samples { - persistence.AppendSample(&sample) + if scrapeResult.Err == nil { + persistence.AppendSample(&scrapeResult.Sample) } case ruleResult := <-ruleResults: - //fmt.Printf("ruleResult -> %s\n", ruleResult) for _, sample := range ruleResult.Samples { persistence.AppendSample(sample) } diff --git a/retrieval/format/discriminator.go b/retrieval/format/discriminator.go new file mode 100644 index 0000000000..edcf6a4230 --- /dev/null +++ b/retrieval/format/discriminator.go @@ -0,0 +1,54 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package format + +import ( + "fmt" + "net/http" +) + +var ( + DefaultRegistry Registry = ®istry{} +) + +// Registry is responsible for applying a determination strategy to the given +// inputs to determine what Processor can handle this type of input. +type Registry interface { + // ProcessorForRequestHeader interprets a HTTP request header to determine + // what Processor should be used for the given input. + ProcessorForRequestHeader(header http.Header) (Processor, error) +} + +type registry struct { +} + +func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Processor, err error) { + if header == nil { + err = fmt.Errorf("Received illegal and nil header.") + return + } + + prometheusApiVersion := header.Get("X-Prometheus-API-Version") + + switch prometheusApiVersion { + case "0.0.1": + processor = Processor001 + return + default: + err = fmt.Errorf("Unrecognized API version %s", prometheusApiVersion) + return + } + + return +} diff --git a/retrieval/format/discriminator_test.go b/retrieval/format/discriminator_test.go new file mode 100644 index 0000000000..edd8148c85 --- /dev/null +++ b/retrieval/format/discriminator_test.go @@ -0,0 +1,82 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package format + +import ( + "fmt" + "github.com/matttproud/prometheus/utility/test" + "net/http" + "testing" +) + +func testDiscriminatorHttpHeader(t test.Tester) { + var scenarios = []struct { + input map[string]string + output Processor + err error + }{ + { + output: nil, + err: fmt.Errorf("Received illegal and nil header."), + }, + { + input: map[string]string{"X-Prometheus-API-Version": "0.0.0"}, + output: nil, + err: fmt.Errorf("Unrecognized API version 0.0.0"), + }, + { + input: map[string]string{"X-Prometheus-API-Version": "0.0.1"}, + output: Processor001, + err: nil, + }, + } + + for i, scenario := range scenarios { + var header http.Header + + if len(scenario.input) > 0 { + header = http.Header{} + } + + for key, value := range scenario.input { + header.Add(key, value) + } + + actual, err := DefaultRegistry.ProcessorForRequestHeader(header) + + if scenario.err != err { + if scenario.err != nil && err != nil { + if scenario.err.Error() != err.Error() { + t.Errorf("%d. expected %s, got %s", i, scenario.err, err) + } + } else if scenario.err != nil || err != nil { + t.Errorf("%d. expected %s, got %s", i, scenario.err, err) + } + } + + if scenario.output != actual { + t.Errorf("%d. expected %s, got %s", i, scenario.output, actual) + } + } +} + +func TestDiscriminatorHttpHeader(t *testing.T) { + testDiscriminatorHttpHeader(t) +} + +func BenchmarkDiscriminatorHttpHeader(b *testing.B) { + for i := 0; i < b.N; i++ { + testDiscriminatorHttpHeader(b) + } +} diff --git a/retrieval/format/interface_test.go b/retrieval/format/interface_test.go new file mode 100644 index 0000000000..330140f474 --- /dev/null +++ b/retrieval/format/interface_test.go @@ -0,0 +1,22 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package format + +import ( + "testing" +) + +func TestInterface(t *testing.T) { + var _ Registry = ®istry{} +} diff --git a/retrieval/format/processor.go b/retrieval/format/processor.go new file mode 100644 index 0000000000..16719218d2 --- /dev/null +++ b/retrieval/format/processor.go @@ -0,0 +1,27 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package format + +import ( + "github.com/matttproud/prometheus/model" + "io" +) + +// Processor is responsible for decoding the actual message responses from +// stream into a format that can be consumed with the end result written +// to the results channel. +type Processor interface { + // Process performs the work on the input and closes the incoming stream. + Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error) +} diff --git a/retrieval/format/processor0_0_1.go b/retrieval/format/processor0_0_1.go new file mode 100644 index 0000000000..4990f4c248 --- /dev/null +++ b/retrieval/format/processor0_0_1.go @@ -0,0 +1,156 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package format + +import ( + "encoding/json" + "fmt" + "github.com/matttproud/prometheus/model" + "io" + "io/ioutil" + "time" +) + +const ( + baseLabels001 = "baseLabels" + counter001 = "counter" + docstring001 = "docstring" + gauge001 = "gauge" + histogram001 = "histogram" + labels001 = "labels" + metric001 = "metric" + type001 = "type" + value001 = "value" + percentile001 = "percentile" +) + +var ( + Processor001 Processor = &processor001{} +) + +// processor001 is responsible for handling API version 0.0.1. +type processor001 struct { +} + +// entity001 represents a the JSON structure that 0.0.1 uses. +type entity001 []struct { + BaseLabels map[string]string `json:"baseLabels"` + Docstring string `json:"docstring"` + Metric struct { + MetricType string `json:"type"` + Value []struct { + Labels map[string]string `json:"labels"` + Value interface{} `json:"value"` + } `json:"value"` + } `json:"metric"` +} + +func (p *processor001) Process(stream io.ReadCloser, baseLabels model.LabelSet, results chan Result) (err error) { + // TODO(matt): Replace with plain-jane JSON unmarshalling. + defer stream.Close() + + buffer, err := ioutil.ReadAll(stream) + if err != nil { + return + } + + entities := entity001{} + + err = json.Unmarshal(buffer, &entities) + if err != nil { + return + } + + // Swap this to the testable timer. + now := time.Now() + + // TODO(matt): This outer loop is a great basis for parallelization. + for _, entity := range entities { + for _, value := range entity.Metric.Value { + metric := model.Metric{} + for label, labelValue := range baseLabels { + metric[label] = labelValue + } + + for label, labelValue := range entity.BaseLabels { + metric[model.LabelName(label)] = model.LabelValue(labelValue) + } + + for label, labelValue := range value.Labels { + metric[model.LabelName(label)] = model.LabelValue(labelValue) + } + + switch entity.Metric.MetricType { + case gauge001, counter001: + sampleValue, ok := value.Value.(float64) + if !ok { + err = fmt.Errorf("Could not convert value from %s %s to float64.", entity, value) + continue + } + + sample := model.Sample{ + Metric: metric, + Timestamp: now, + Value: model.SampleValue(sampleValue), + } + + results <- Result{ + Err: err, + Sample: sample, + } + + break + + case histogram001: + sampleValue, ok := value.Value.(map[string]interface{}) + if !ok { + err = fmt.Errorf("Could not convert value from %q to a map[string]interface{}.", value.Value) + continue + } + + for percentile, percentileValue := range sampleValue { + individualValue, ok := percentileValue.(float64) + if !ok { + err = fmt.Errorf("Could not convert value from %q to a float64.", percentileValue) + continue + } + + childMetric := make(map[model.LabelName]model.LabelValue, len(metric)+1) + + for k, v := range metric { + childMetric[k] = v + } + + childMetric[model.LabelName(percentile001)] = model.LabelValue(percentile) + + sample := model.Sample{ + Metric: childMetric, + Timestamp: now, + Value: model.SampleValue(individualValue), + } + + results <- Result{ + Err: err, + Sample: sample, + } + } + + break + default: + } + } + } + + return +} diff --git a/retrieval/format/processor0_0_1_test.go b/retrieval/format/processor0_0_1_test.go new file mode 100644 index 0000000000..c7382354ee --- /dev/null +++ b/retrieval/format/processor0_0_1_test.go @@ -0,0 +1,262 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package format + +import ( + "container/list" + "fmt" + "github.com/matttproud/prometheus/model" + "github.com/matttproud/prometheus/utility/test" + "io/ioutil" + "strings" + "testing" +) + +func testProcessor001Process(t test.Tester) { + var scenarios = []struct { + in string + out []Result + err error + }{ + { + err: fmt.Errorf("unexpected end of JSON input"), + }, + { + in: "[{\"baseLabels\":{\"name\":\"rpc_calls_total\"},\"docstring\":\"RPC calls.\",\"metric\":{\"type\":\"counter\",\"value\":[{\"labels\":{\"service\":\"zed\"},\"value\":25},{\"labels\":{\"service\":\"bar\"},\"value\":25},{\"labels\":{\"service\":\"foo\"},\"value\":25}]}},{\"baseLabels\":{\"name\":\"rpc_latency_microseconds\"},\"docstring\":\"RPC latency.\",\"metric\":{\"type\":\"histogram\",\"value\":[{\"labels\":{\"service\":\"foo\"},\"value\":{\"0.010000\":15.890724674774395,\"0.050000\":15.890724674774395,\"0.500000\":84.63044031436561,\"0.900000\":160.21100853053224,\"0.990000\":172.49828748957728}},{\"labels\":{\"service\":\"zed\"},\"value\":{\"0.010000\":0.0459814091918713,\"0.050000\":0.0459814091918713,\"0.500000\":0.6120456642749681,\"0.900000\":1.355915069887731,\"0.990000\":1.772733213161236}},{\"labels\":{\"service\":\"bar\"},\"value\":{\"0.010000\":78.48563317257356,\"0.050000\":78.48563317257356,\"0.500000\":97.31798360385088,\"0.900000\":109.89202084295582,\"0.990000\":109.99626121011262}}]}}]", + out: []Result{ + { + Sample: model.Sample{ + Metric: model.Metric{"service": "zed", "name": "rpc_calls_total"}, + Value: 25, + }, + }, + { + Sample: model.Sample{ + Metric: model.Metric{"service": "bar", "name": "rpc_calls_total"}, + Value: 25, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"service": "foo", "name": "rpc_calls_total"}, + Value: 25, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.010000", "name": "rpc_latency_microseconds", "service": "zed"}, + Value: 0.04598141, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.010000", "name": "rpc_latency_microseconds", "service": "bar"}, + Value: 78.485634, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.010000", "name": "rpc_latency_microseconds", "service": "foo"}, + Value: 15.890724674774395, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.050000", "name": "rpc_latency_microseconds", "service": "zed"}, + Value: 0.04598141, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.050000", "name": "rpc_latency_microseconds", "service": "bar"}, + Value: 78.485634, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.050000", "name": "rpc_latency_microseconds", "service": "foo"}, + Value: 15.890724674774395, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.500000", "name": "rpc_latency_microseconds", "service": "zed"}, + Value: 0.61204565, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.500000", "name": "rpc_latency_microseconds", "service": "bar"}, + Value: 97.317986, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.500000", "name": "rpc_latency_microseconds", "service": "foo"}, + Value: 84.63044, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.900000", "name": "rpc_latency_microseconds", "service": "zed"}, + Value: 1.3559151, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.900000", "name": "rpc_latency_microseconds", "service": "bar"}, + Value: 109.89202, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.900000", "name": "rpc_latency_microseconds", "service": "foo"}, + Value: 160.21101, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.990000", "name": "rpc_latency_microseconds", "service": "zed"}, + Value: 1.7727332, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.990000", "name": "rpc_latency_microseconds", "service": "bar"}, + Value: 109.99626, + }, + }, + { + Sample: model.Sample{ + + Metric: model.Metric{"percentile": "0.990000", "name": "rpc_latency_microseconds", "service": "foo"}, + Value: 172.49829, + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + inputChannel := make(chan Result, 1024) + + defer func(c chan Result) { + close(c) + }(inputChannel) + + reader := strings.NewReader(scenario.in) + + err := Processor001.Process(ioutil.NopCloser(reader), model.LabelSet{}, inputChannel) + if !test.ErrorEqual(scenario.err, err) { + t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) + continue + } + + if scenario.err != nil && err != nil { + if scenario.err.Error() != err.Error() { + t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) + } + } else if scenario.err != err { + t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err) + } + + delivered := make([]Result, 0) + + for len(inputChannel) != 0 { + delivered = append(delivered, <-inputChannel) + } + + if len(delivered) != len(scenario.out) { + t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered)) + + continue + } + + expectedElements := list.New() + for _, j := range scenario.out { + expectedElements.PushBack(j) + } + + for j := 0; j < len(delivered); j++ { + actual := delivered[j] + + found := false + for element := expectedElements.Front(); element != nil && found == false; element = element.Next() { + candidate := element.Value.(Result) + + if !test.ErrorEqual(candidate.Err, actual.Err) { + continue + } + + if candidate.Sample.Value != actual.Sample.Value { + continue + } + + if len(candidate.Sample.Metric) != len(actual.Sample.Metric) { + continue + } + + labelsMatch := false + + for key, value := range candidate.Sample.Metric { + actualValue, ok := actual.Sample.Metric[key] + if !ok { + break + } + if actualValue == value { + labelsMatch = true + break + } + } + + if !labelsMatch { + continue + } + + // XXX: Test time. + found = true + expectedElements.Remove(element) + } + + if !found { + t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual.Sample) + } + } + } +} + +func TestProcessor001Process(t *testing.T) { + testProcessor001Process(t) +} + +func BenchmarkProcessor001Process(b *testing.B) { + for i := 0; i < b.N; i++ { + testProcessor001Process(b) + } +} diff --git a/retrieval/format/result.go b/retrieval/format/result.go new file mode 100644 index 0000000000..c04f744a9d --- /dev/null +++ b/retrieval/format/result.go @@ -0,0 +1,25 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package format + +import ( + "github.com/matttproud/prometheus/model" +) + +// Result encapsulates the outcome from processing a given sample from a +// source. +type Result struct { + Err error + Sample model.Sample +} diff --git a/retrieval/instrumentation.go b/retrieval/instrumentation.go index 0faeda55ca..ecbf231a3e 100644 --- a/retrieval/instrumentation.go +++ b/retrieval/instrumentation.go @@ -14,9 +14,9 @@ package retrieval import ( - "github.com/matttproud/golang_instrumentation" - "github.com/matttproud/golang_instrumentation/maths" - "github.com/matttproud/golang_instrumentation/metrics" + "github.com/prometheus/client_golang" + "github.com/prometheus/client_golang/maths" + "github.com/prometheus/client_golang/metrics" ) const ( @@ -38,6 +38,8 @@ var ( targetOperationLatencies = metrics.NewHistogram(networkLatencyHistogram) + // TODO: Include durations partitioned by target pool intervals. + targetOperations = metrics.NewCounter() ) diff --git a/retrieval/target.go b/retrieval/target.go index 3bb6c1fdd6..d0e0dc567a 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -13,16 +13,18 @@ package retrieval import ( - "encoding/json" "fmt" - "github.com/matttproud/golang_instrumentation/metrics" "github.com/matttproud/prometheus/model" - "io/ioutil" + "github.com/matttproud/prometheus/retrieval/format" + "github.com/prometheus/client_golang/metrics" "net/http" - "strconv" "time" ) +const ( + instance = "instance" +) + // The state of the given Target. type TargetState int @@ -64,7 +66,7 @@ type Target interface { // alluded to in the scheduledFor function, to use this as it wants to. The // current use case is to create a common batching time for scraping multiple // Targets in the future through the TargetPool. - Scrape(earliest time.Time, results chan Result) error + Scrape(earliest time.Time, results chan format.Result) error // Fulfill the healthReporter interface. State() TargetState // Report the soonest time at which this Target may be scheduled for @@ -115,15 +117,7 @@ func NewTarget(address string, interval, deadline time.Duration, baseLabels mode return target } -type Result struct { - Err error - Samples []model.Sample - Target Target -} - -func (t *target) Scrape(earliest time.Time, results chan Result) (err error) { - result := Result{} - +func (t *target) Scrape(earliest time.Time, results chan format.Result) (err error) { defer func() { futureState := t.state @@ -135,15 +129,15 @@ func (t *target) Scrape(earliest time.Time, results chan Result) (err error) { } t.scheduler.Reschedule(earliest, futureState) - - result.Err = err - results <- result }() done := make(chan bool) request := func() { - ti := time.Now() + defer func() { + done <- true + }() + resp, err := http.Get(t.Address()) if err != nil { return @@ -151,86 +145,22 @@ func (t *target) Scrape(earliest time.Time, results chan Result) (err error) { defer resp.Body.Close() - raw, err := ioutil.ReadAll(resp.Body) - if err != nil { - return - } - - intermediate := make(map[string]interface{}) - err = json.Unmarshal(raw, &intermediate) + processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header) if err != nil { return } - baseLabels := model.LabelSet{"instance": model.LabelValue(t.Address())} - for baseK, baseV := range t.BaseLabels { - baseLabels[baseK] = baseV + // XXX: This is a wart; we need to handle this more gracefully down the + // road, especially once we have service discovery support. + baseLabels := model.LabelSet{instance: model.LabelValue(t.Address())} + for baseLabel, baseValue := range t.BaseLabels { + baseLabels[baseLabel] = baseValue } - for name, v := range intermediate { - asMap, ok := v.(map[string]interface{}) - - if !ok { - continue - } - - switch asMap["type"] { - case "counter": - m := model.Metric{} - m["name"] = model.LabelValue(name) - asFloat, ok := asMap["value"].(float64) - if !ok { - continue - } - - s := model.Sample{ - Metric: m, - Value: model.SampleValue(asFloat), - Timestamp: ti, - } - - for baseK, baseV := range baseLabels { - m[baseK] = baseV - } - - result.Samples = append(result.Samples, s) - case "histogram": - values, ok := asMap["value"].(map[string]interface{}) - if !ok { - continue - } - - for p, pValue := range values { - asString, ok := pValue.(string) - if !ok { - continue - } - - float, err := strconv.ParseFloat(asString, 64) - if err != nil { - continue - } - - m := model.Metric{} - m["name"] = model.LabelValue(name) - m["percentile"] = model.LabelValue(p) - - s := model.Sample{ - Metric: m, - Value: model.SampleValue(float), - Timestamp: ti, - } - - for baseK, baseV := range baseLabels { - m[baseK] = baseV - } - - result.Samples = append(result.Samples, s) - } - } + err = processor.Process(resp.Body, baseLabels, results) + if err != nil { + return } - - done <- true } accumulator := func(d time.Duration) { diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index f0f404ddf1..bfe5f4cacd 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -17,6 +17,7 @@ import ( "container/heap" "github.com/matttproud/prometheus/config" "github.com/matttproud/prometheus/model" + "github.com/matttproud/prometheus/retrieval/format" "log" "time" ) @@ -32,10 +33,10 @@ type TargetManager interface { type targetManager struct { requestAllowance chan bool pools map[time.Duration]*TargetPool - results chan Result + results chan format.Result } -func NewTargetManager(results chan Result, requestAllowance int) TargetManager { +func NewTargetManager(results chan format.Result, requestAllowance int) TargetManager { return &targetManager{ requestAllowance: make(chan bool, requestAllowance), results: results, diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 40e50dee6c..62c19fe258 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -14,6 +14,7 @@ package retrieval import ( + "github.com/matttproud/prometheus/retrieval/format" "github.com/matttproud/prometheus/utility/test" "testing" "time" @@ -34,7 +35,7 @@ func (t fakeTarget) Interval() time.Duration { return t.interval } -func (t *fakeTarget) Scrape(e time.Time, r chan Result) error { +func (t *fakeTarget) Scrape(e time.Time, r chan format.Result) error { t.scrapeCount++ return nil @@ -52,7 +53,7 @@ func (t *fakeTarget) scheduledFor() (time time.Time) { } func testTargetManager(t test.Tester) { - results := make(chan Result, 5) + results := make(chan format.Result, 5) targetManager := NewTargetManager(results, 3) target1GroupA := &fakeTarget{ diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index 4f4ed54843..fed9ea8cfc 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -2,6 +2,7 @@ package retrieval import ( "container/heap" + "github.com/matttproud/prometheus/retrieval/format" "log" "time" ) @@ -44,7 +45,7 @@ func (p TargetPool) Swap(i, j int) { p.targets[i], p.targets[j] = p.targets[j], p.targets[i] } -func (p *TargetPool) Run(results chan Result, interval time.Duration) { +func (p *TargetPool) Run(results chan format.Result, interval time.Duration) { ticker := time.Tick(interval) for { @@ -62,14 +63,14 @@ func (p TargetPool) Stop() { p.done <- true } -func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t Target) { +func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) { p.manager.acquire() defer p.manager.release() t.Scrape(earliest, results) } -func (p *TargetPool) runIteration(results chan Result) { +func (p *TargetPool) runIteration(results chan format.Result) { for i := 0; i < p.Len(); i++ { target := heap.Pop(p).(Target) if target == nil { diff --git a/storage/metric/leveldb/instrumentation.go b/storage/metric/leveldb/instrumentation.go index 5a7f54d5b5..2bfcb65865 100644 --- a/storage/metric/leveldb/instrumentation.go +++ b/storage/metric/leveldb/instrumentation.go @@ -14,9 +14,9 @@ package leveldb import ( - "github.com/matttproud/golang_instrumentation" - "github.com/matttproud/golang_instrumentation/maths" - "github.com/matttproud/golang_instrumentation/metrics" + "github.com/prometheus/client_golang" + "github.com/prometheus/client_golang/maths" + "github.com/prometheus/client_golang/metrics" "time" ) diff --git a/utility/test/error.go b/utility/test/error.go new file mode 100644 index 0000000000..2426a50744 --- /dev/null +++ b/utility/test/error.go @@ -0,0 +1,31 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +// ErrorEqual compares Go errors for equality. +func ErrorEqual(left, right error) bool { + if left == right { + return true + } + + if left != nil && right != nil { + if left.Error() == right.Error() { + return true + } + + return false + } + + return false +}