The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/registry/apis/provisioning/webhook.go

169 lines
4.8 KiB

package provisioning
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana/pkg/apimachinery/identity"
provisioning "github.com/grafana/grafana/pkg/apis/provisioning/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/jobs"
"github.com/grafana/grafana/pkg/registry/apis/provisioning/repository"
)
// Webhook endpoint max size (25MB)
// See https://docs.github.com/en/webhooks/webhook-events-and-payloads
const webhookMaxBodySize = 25 * 1024 * 1024
// This only works for github right now
type webhookConnector struct {
client ClientGetter
getter RepoGetter
jobs jobs.Queue
webhooksEnabled bool
}
func NewWebhookConnector(client ClientGetter, getter RepoGetter, jobs jobs.Queue, webhooksEnabled bool) *webhookConnector {
return &webhookConnector{
client: client,
getter: getter,
jobs: jobs,
webhooksEnabled: webhooksEnabled,
}
}
func (*webhookConnector) New() runtime.Object {
return &provisioning.WebhookResponse{}
}
func (*webhookConnector) Destroy() {}
func (*webhookConnector) ProducesMIMETypes(verb string) []string {
return []string{"application/json"}
}
func (*webhookConnector) ProducesObject(verb string) any {
return &provisioning.WebhookResponse{}
}
func (*webhookConnector) ConnectMethods() []string {
return []string{
http.MethodPost,
http.MethodGet, // only useful for browser testing, should be removed
}
}
func (*webhookConnector) NewConnectOptions() (runtime.Object, bool, string) {
return nil, false, ""
}
func (s *webhookConnector) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
namespace := request.NamespaceValue(ctx)
ctx, _, err := identity.WithProvisioningIdentity(ctx, namespace)
if err != nil {
return nil, err
}
// Get the repository with the worker identity (since the request user is likely anonymous)
repo, err := s.getter.GetHealthyRepository(ctx, name)
if err != nil {
return nil, err
}
return withTimeout(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger := logging.FromContext(r.Context()).With("logger", "webhook-connector", "repo", name)
ctx := logging.Context(r.Context(), logger)
if !s.webhooksEnabled {
responder.Error(errors.NewBadRequest("webhooks are not enabled"))
return
}
hooks, ok := repo.(repository.Hooks)
if !ok {
responder.Error(errors.NewBadRequest("the repository does not support webhooks"))
return
}
// Limit the webhook request body size
r.Body = http.MaxBytesReader(w, r.Body, webhookMaxBodySize)
rsp, err := hooks.Webhook(ctx, r)
if err != nil {
responder.Error(err)
return
}
if rsp == nil {
responder.Error(fmt.Errorf("expecting a response"))
return
}
if err := s.updateLastEvent(ctx, repo, name, namespace); err != nil {
// Continue processing as this is non-critical; the update is purely informational
logger.Error("failed to update last event", "error", err)
}
if rsp.Job != nil {
rsp.Job.Repository = name
job, err := s.jobs.Insert(ctx, namespace, *rsp.Job)
if err != nil {
responder.Error(err)
return
}
responder.Object(rsp.Code, job)
return
}
responder.Object(rsp.Code, rsp)
}), 30*time.Second), nil
}
// updateLastEvent updates the last event time for the webhook
// This is to provide some visibility that the webhook is still active and working
// It's not a good idea to update the webhook status too often, so we only update it if it's been a while
func (s *webhookConnector) updateLastEvent(ctx context.Context, repo repository.Repository, name, namespace string) error {
client := s.client.GetClient()
if client == nil {
// This would only happen if we wired things up incorrectly
return fmt.Errorf("client is nil")
}
lastEvent := time.UnixMilli(repo.Config().Status.Webhook.LastEvent)
eventAge := time.Since(lastEvent)
if repo.Config().Status.Webhook != nil && (eventAge > time.Minute) {
patchOp := map[string]interface{}{
"op": "replace",
"path": "/status/webhook/lastEvent",
"value": time.Now().UnixMilli(),
}
patch, err := json.Marshal([]map[string]interface{}{patchOp})
if err != nil {
return fmt.Errorf("marshal patch: %w", err)
}
if _, err = client.Repositories(namespace).
Patch(ctx, name, types.JSONPatchType, patch, metav1.PatchOptions{}, "status"); err != nil {
return fmt.Errorf("patch status: %w", err)
}
}
return nil
}
var (
_ rest.Storage = (*webhookConnector)(nil)
_ rest.Connecter = (*webhookConnector)(nil)
_ rest.StorageMetadata = (*webhookConnector)(nil)
)