The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/tsdb/cloudmonitoring/resource_handler.go

304 lines
8.0 KiB

package cloudmonitoring
import (
"bytes"
"compress/flate"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strings"
"github.com/andybalholm/brotli"
"github.com/grafana/grafana-google-sdk-go/pkg/utils"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
)
// nameExp matches the part after the last '/' symbol
var nameExp = regexp.MustCompile(`([^\/]*)\/*$`)
const resourceManagerPath = "/v1/projects"
type processResponse func(body []byte) ([]byte, error)
func (s *Service) registerRoutes(mux *http.ServeMux) {
mux.HandleFunc("/gceDefaultProject", getGCEDefaultProject)
mux.HandleFunc("/metricDescriptors/", s.resourceHandler(cloudMonitor, processMetricDescriptors))
mux.HandleFunc("/services/", s.resourceHandler(cloudMonitor, processServices))
mux.HandleFunc("/slo-services/", s.resourceHandler(cloudMonitor, processSLOs))
mux.HandleFunc("/projects", s.resourceHandler(resourceManager, processProjects))
}
func getGCEDefaultProject(rw http.ResponseWriter, req *http.Request) {
project, err := utils.GCEDefaultProject(req.Context())
if err != nil {
writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("unexpected error %v", err))
return
}
writeResponse(rw, http.StatusOK, project)
}
func (s *Service) resourceHandler(subDataSource string, responseFn processResponse) func(rw http.ResponseWriter, req *http.Request) {
return func(rw http.ResponseWriter, req *http.Request) {
client, code, err := s.setRequestVariables(req, subDataSource)
if err != nil {
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
return
}
s.doRequest(rw, req, client, responseFn)
}
}
func (s *Service) doRequest(rw http.ResponseWriter, req *http.Request, cli *http.Client, responseFn processResponse) http.ResponseWriter {
res, err := cli.Do(req)
if err != nil {
writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("unexpected error %v", err))
return rw
}
defer func() {
if err := res.Body.Close(); err != nil {
slog.Warn("Failed to close response body", "err", err)
}
}()
if responseFn == nil {
writeResponse(rw, http.StatusInternalServerError, "responseFn should not be nil")
return rw
}
body, code, err := processData(res, responseFn)
if err != nil {
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
return rw
}
writeResponseBytes(rw, res.StatusCode, body)
for k, v := range res.Header {
rw.Header().Set(k, v[0])
for _, v := range v[1:] {
rw.Header().Add(k, v)
}
}
return rw
}
func processMetricDescriptors(body []byte) ([]byte, error) {
resp := metricDescriptorResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
for i := range resp.Descriptors {
resp.Descriptors[i].Service = strings.SplitN(resp.Descriptors[i].Type, "/", 2)[0]
resp.Descriptors[i].ServiceShortName = strings.SplitN(resp.Descriptors[i].Service, ".", 2)[0]
if resp.Descriptors[i].DisplayName == "" {
resp.Descriptors[i].DisplayName = resp.Descriptors[i].Type
}
}
return json.Marshal(resp.Descriptors)
}
func processServices(body []byte) ([]byte, error) {
resp := serviceResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
values := []selectableValue{}
for _, service := range resp.Services {
name := nameExp.FindString(service.Name)
if name == "" {
return nil, fmt.Errorf("unexpected service name: %v", service.Name)
}
label := service.DisplayName
if label == "" {
label = name
}
values = append(values, selectableValue{
Value: name,
Label: label,
})
}
return json.Marshal(values)
}
func processSLOs(body []byte) ([]byte, error) {
resp := sloResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
values := []selectableValue{}
for _, slo := range resp.SLOs {
name := nameExp.FindString(slo.Name)
if name == "" {
return nil, fmt.Errorf("unexpected service name: %v", slo.Name)
}
values = append(values, selectableValue{
Value: name,
Label: slo.DisplayName,
Goal: slo.Goal,
})
}
return json.Marshal(values)
}
func processProjects(body []byte) ([]byte, error) {
resp := projectResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
values := []selectableValue{}
for _, project := range resp.Projects {
values = append(values, selectableValue{
Value: project.ProjectID,
Label: project.Name,
})
}
return json.Marshal(values)
}
func processData(res *http.Response, responseFn processResponse) ([]byte, int, error) {
encoding := res.Header.Get("Content-Encoding")
var reader io.Reader
var err error
switch encoding {
case "gzip":
reader, err = gzip.NewReader(res.Body)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err)
}
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
}
}()
case "deflate":
reader = flate.NewReader(res.Body)
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
}
}()
case "br":
reader = brotli.NewReader(res.Body)
case "":
reader = res.Body
default:
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", err)
}
body, err := ioutil.ReadAll(reader)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err)
}
body, err = responseFn(body)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("data processing error %v", err)
}
buf := new(bytes.Buffer)
var writer io.Writer = buf
switch encoding {
case "gzip":
writer = gzip.NewWriter(writer)
case "deflate":
writer, err = flate.NewWriter(writer, -1)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected error %v", err)
}
case "br":
writer = brotli.NewWriter(writer)
case "":
default:
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", encoding)
}
_, err = writer.Write(body)
if writeCloser, ok := writer.(io.WriteCloser); ok {
if err := writeCloser.Close(); err != nil {
slog.Warn("Failed to close writer body", "err", err)
}
}
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("unable to encode response %v", err)
}
return buf.Bytes(), 0, nil
}
func (s *Service) setRequestVariables(req *http.Request, subDataSource string) (*http.Client, int, error) {
slog.Debug("Received resource call", "url", req.URL.String(), "method", req.Method)
newPath, err := getTarget(req.URL.Path)
if err != nil {
return nil, http.StatusBadRequest, err
}
dsInfo, err := s.getDataSourceFromHTTPReq(req)
if err != nil {
return nil, http.StatusBadRequest, err
}
serviceURL, err := url.Parse(dsInfo.services[subDataSource].url)
if err != nil {
return nil, http.StatusBadRequest, err
}
req.URL.Path = newPath
req.URL.Host = serviceURL.Host
req.URL.Scheme = serviceURL.Scheme
return dsInfo.services[subDataSource].client, 0, nil
}
func getTarget(original string) (target string, err error) {
if original == "/projects" {
return resourceManagerPath, nil
}
splittedPath := strings.SplitN(original, "/", 3)
if len(splittedPath) < 3 {
err = fmt.Errorf("the request should contain the service on its path")
return
}
target = fmt.Sprintf("/%s", splittedPath[2])
return
}
func writeResponseBytes(rw http.ResponseWriter, code int, msg []byte) {
rw.WriteHeader(code)
_, err := rw.Write(msg)
if err != nil {
slog.Error("Unable to write HTTP response", "error", err)
}
}
func writeResponse(rw http.ResponseWriter, code int, msg string) {
writeResponseBytes(rw, code, []byte(msg))
}
func (s *Service) getDataSourceFromHTTPReq(req *http.Request) (*datasourceInfo, error) {
ctx := req.Context()
pluginContext := httpadapter.PluginConfigFromContext(ctx)
i, err := s.im.Get(pluginContext)
if err != nil {
return nil, nil
}
ds, ok := i.(*datasourceInfo)
if !ok {
return nil, fmt.Errorf("unable to convert datasource from service instance")
}
return ds, nil
}