The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/tsdb/cloud-monitoring/resource_handler.go

409 lines
11 KiB

package cloudmonitoring
import (
"bytes"
"compress/flate"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"time"
"github.com/andybalholm/brotli"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
)
// nameExp matches the part after the last '/' symbol
var nameExp = regexp.MustCompile(`([^\/]*)\/*$`)
const resourceManagerPath = "/v1/projects"
type processResponse func(body []byte) ([]json.RawMessage, string, error)
func (s *Service) newResourceMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/gceDefaultProject", s.getGCEDefaultProject)
mux.HandleFunc("/metricDescriptors/", s.handleResourceReq(cloudMonitor, processMetricDescriptors))
mux.HandleFunc("/services/", s.handleResourceReq(cloudMonitor, processServices))
mux.HandleFunc("/slo-services/", s.handleResourceReq(cloudMonitor, processSLOs))
mux.HandleFunc("/projects", s.handleResourceReq(resourceManager, processProjects))
return mux
}
func (s *Service) getGCEDefaultProject(rw http.ResponseWriter, req *http.Request) {
project, err := s.gceDefaultProjectGetter(req.Context(), resourceManagerScope)
if err != nil {
writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("unexpected error %v", err))
return
}
encoded, err := json.Marshal(project)
if err != nil {
writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("error retrieving default project %v", err))
return
}
writeResponseBytes(rw, http.StatusOK, encoded)
}
func (s *Service) handleResourceReq(subDataSource string, responseFn processResponse) func(rw http.ResponseWriter, req *http.Request) {
return func(rw http.ResponseWriter, req *http.Request) {
client, code, err := s.setRequestVariables(req, subDataSource)
if err != nil {
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
return
}
getResources(rw, req, client, responseFn)
}
}
func getResources(rw http.ResponseWriter, req *http.Request, cli *http.Client, responseFn processResponse) http.ResponseWriter {
if responseFn == nil {
writeResponse(rw, http.StatusInternalServerError, "responseFn should not be nil")
return rw
}
responses, headers, encoding, code, err := getResponses(req, cli, responseFn)
if err != nil {
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
return rw
}
body, err := buildResponse(responses, encoding)
if err != nil {
writeResponse(rw, http.StatusInternalServerError, fmt.Sprintf("error formatting responose %v", err))
return rw
}
writeResponseBytes(rw, code, body)
for k, v := range headers {
rw.Header().Set(k, v[0])
for _, v := range v[1:] {
rw.Header().Add(k, v)
}
}
return rw
}
func processMetricDescriptors(body []byte) ([]json.RawMessage, string, error) {
resp := metricDescriptorResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, "", err
}
results := []json.RawMessage{}
for i := range resp.Descriptors {
resp.Descriptors[i].Service = strings.SplitN(resp.Descriptors[i].Type, "/", 2)[0]
resp.Descriptors[i].ServiceShortName = strings.SplitN(resp.Descriptors[i].Service, ".", 2)[0]
if resp.Descriptors[i].DisplayName == "" {
resp.Descriptors[i].DisplayName = resp.Descriptors[i].Type
}
descriptor, err := json.Marshal(resp.Descriptors[i])
if err != nil {
return nil, "", err
}
results = append(results, descriptor)
}
return results, resp.Token, nil
}
func processServices(body []byte) ([]json.RawMessage, string, error) {
resp := serviceResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, "", err
}
results := []json.RawMessage{}
for _, service := range resp.Services {
name := nameExp.FindString(service.Name)
if name == "" {
return nil, "", fmt.Errorf("unexpected service name: %v", service.Name)
}
label := service.DisplayName
if label == "" {
label = name
}
marshaledValue, err := json.Marshal(selectableValue{
Value: name,
Label: label,
})
if err != nil {
return nil, "", err
}
results = append(results, marshaledValue)
}
return results, resp.Token, nil
}
func processSLOs(body []byte) ([]json.RawMessage, string, error) {
resp := sloResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, "", err
}
results := []json.RawMessage{}
for _, slo := range resp.SLOs {
name := nameExp.FindString(slo.Name)
if name == "" {
return nil, "", fmt.Errorf("unexpected service name: %v", slo.Name)
}
marshaledValue, err := json.Marshal(selectableValue{
Value: name,
Label: slo.DisplayName,
Goal: slo.Goal,
})
if err != nil {
return nil, "", err
}
results = append(results, marshaledValue)
}
return results, resp.Token, nil
}
func processProjects(body []byte) ([]json.RawMessage, string, error) {
resp := projectResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, "", err
}
results := []json.RawMessage{}
for _, project := range resp.Projects {
marshaledValue, err := json.Marshal(selectableValue{
Value: project.ProjectID,
Label: project.Name,
})
if err != nil {
return nil, "", err
}
results = append(results, marshaledValue)
}
return results, resp.Token, nil
}
func decode(encoding string, original io.ReadCloser) ([]byte, error) {
var reader io.Reader
var err error
switch encoding {
case "gzip":
reader, err = gzip.NewReader(original)
if err != nil {
return nil, err
}
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
}
}()
case "deflate":
reader = flate.NewReader(original)
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
}
}()
case "br":
reader = brotli.NewReader(original)
case "":
reader = original
default:
return nil, fmt.Errorf("unexpected encoding type %v", err)
}
body, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
return body, nil
}
func encode(encoding string, body []byte) ([]byte, error) {
buf := new(bytes.Buffer)
var writer io.Writer = buf
var err error
switch encoding {
case "gzip":
writer = gzip.NewWriter(writer)
case "deflate":
writer, err = flate.NewWriter(writer, -1)
if err != nil {
return nil, err
}
case "br":
writer = brotli.NewWriter(writer)
case "":
default:
return nil, fmt.Errorf("unexpected encoding type %v", encoding)
}
_, err = writer.Write(body)
if writeCloser, ok := writer.(io.WriteCloser); ok {
if err := writeCloser.Close(); err != nil {
slog.Warn("Failed to close writer body", "err", err)
}
}
if err != nil {
return nil, fmt.Errorf("unable to encode response %v", err)
}
return buf.Bytes(), nil
}
func processData(data io.ReadCloser, encoding string, responseFn processResponse) ([]json.RawMessage, string, int, error) {
body, err := decode(encoding, data)
if err != nil {
return nil, "", http.StatusBadRequest, fmt.Errorf("unable to decode response %v", err)
}
response, token, err := responseFn(body)
if err != nil {
return nil, "", http.StatusInternalServerError, fmt.Errorf("data processing error %v", err)
}
return response, token, 0, nil
}
type apiResponse struct {
encoding string
header http.Header
responses []json.RawMessage
token string
code int
err error
}
func doRequest(req *http.Request, cli *http.Client, responseFn processResponse) *apiResponse {
res, err := cli.Do(req)
if err != nil {
return &apiResponse{code: http.StatusBadRequest, err: err}
}
defer func() {
if err := res.Body.Close(); err != nil {
slog.Warn("Failed to close response body", "err", err)
}
}()
encoding := res.Header.Get("Content-Encoding")
originalHeader := res.Header
code := res.StatusCode
responses, token, errcode, err := processData(res.Body, encoding, responseFn)
if err != nil {
code = errcode
}
return &apiResponse{
encoding: encoding,
header: originalHeader,
responses: responses,
token: token,
code: code,
err: err,
}
}
func getResponses(req *http.Request, cli *http.Client, responseFn processResponse) ([]json.RawMessage, http.Header, string, int, error) {
timings := ""
start := time.Now()
result := doRequest(req, cli, responseFn)
elapsed := time.Since(start)
timings = timings + " " + strconv.FormatFloat(elapsed.Seconds(), 'f', 10, 64)
if result.err != nil {
return nil, nil, "", result.code, result.err
}
token := result.token
responses := result.responses
for token != "" {
start = time.Now()
query := req.URL.Query()
query.Set("pageToken", token)
req.URL.RawQuery = query.Encode()
loopResult := doRequest(req, cli, responseFn)
elapsed = time.Since(start)
timings = timings + " " + strconv.FormatFloat(elapsed.Seconds(), 'f', 10, 64)
if loopResult.err != nil {
return nil, nil, "", loopResult.code, loopResult.err
}
responses = append(responses, loopResult.responses...)
token = loopResult.token
}
result.header.Set("Server-Timing", fmt.Sprintf("doRequest;dur=%v", timings))
return responses, result.header, result.encoding, result.code, nil
}
func buildResponse(responses []json.RawMessage, encoding string) ([]byte, error) {
body, err := json.Marshal(responses)
if err != nil {
return nil, fmt.Errorf("response marshaling error %v", err)
}
return encode(encoding, body)
}
func (s *Service) setRequestVariables(req *http.Request, subDataSource string) (*http.Client, int, error) {
slog.Debug("Received resource call", "url", req.URL.String(), "method", req.Method)
newPath, err := getTarget(req.URL.Path)
if err != nil {
return nil, http.StatusBadRequest, err
}
dsInfo, err := s.getDataSourceFromHTTPReq(req)
if err != nil {
return nil, http.StatusBadRequest, err
}
serviceURL, err := url.Parse(dsInfo.services[subDataSource].url)
if err != nil {
return nil, http.StatusBadRequest, err
}
req.URL.Path = newPath
req.URL.Host = serviceURL.Host
req.URL.Scheme = serviceURL.Scheme
return dsInfo.services[subDataSource].client, 0, nil
}
func getTarget(original string) (target string, err error) {
if original == "/projects" {
return resourceManagerPath, nil
}
splittedPath := strings.SplitN(original, "/", 3)
if len(splittedPath) < 3 {
err = fmt.Errorf("the request should contain the service on its path")
return
}
target = fmt.Sprintf("/%s", splittedPath[2])
return
}
func writeResponseBytes(rw http.ResponseWriter, code int, msg []byte) {
rw.WriteHeader(code)
_, err := rw.Write(msg)
if err != nil {
slog.Error("Unable to write HTTP response", "error", err)
}
}
func writeResponse(rw http.ResponseWriter, code int, msg string) {
writeResponseBytes(rw, code, []byte(msg))
}
func (s *Service) getDataSourceFromHTTPReq(req *http.Request) (*datasourceInfo, error) {
ctx := req.Context()
pluginContext := httpadapter.PluginConfigFromContext(ctx)
i, err := s.im.Get(ctx, pluginContext)
if err != nil {
return nil, nil
}
ds, ok := i.(*datasourceInfo)
if !ok {
return nil, fmt.Errorf("unable to convert datasource from service instance")
}
return ds, nil
}