From 02c67bf879689c438c508b4637c67378ac29e784 Mon Sep 17 00:00:00 2001 From: Eran Raichstein Date: Thu, 26 Aug 2021 09:13:16 +0300 Subject: [PATCH] fix manifests and mutate function (#72) --- internal/manifests/compactor.go | 24 +++++++++++++++---- internal/manifests/distributor.go | 23 ++++++++++++++---- internal/manifests/ingester.go | 25 +++++++++++++++---- internal/manifests/memberlist.go | 9 ++++--- internal/manifests/mutate.go | 36 +++++++++++++++++++++------- internal/manifests/querier.go | 24 +++++++++++++++---- internal/manifests/query-frontend.go | 22 +++++++++++++---- internal/manifests/var.go | 15 ++++++++---- 8 files changed, 140 insertions(+), 38 deletions(-) diff --git a/internal/manifests/compactor.go b/internal/manifests/compactor.go index 16c7f59c04..f2fbe38773 100644 --- a/internal/manifests/compactor.go +++ b/internal/manifests/compactor.go @@ -40,6 +40,7 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet { Name: configVolumeName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ + DefaultMode: &defaultConfigMapMode, LocalObjectReference: corev1.LocalObjectReference{ Name: lokiConfigMapName(opts.Name), }, @@ -68,8 +69,11 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet { Scheme: corev1.URISchemeHTTP, }, }, + PeriodSeconds: 10, InitialDelaySeconds: 15, TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, }, LivenessProbe: &corev1.Probe{ Handler: corev1.Handler{ @@ -82,15 +86,18 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet { TimeoutSeconds: 2, PeriodSeconds: 30, FailureThreshold: 10, + SuccessThreshold: 1, }, Ports: []corev1.ContainerPort{ { Name: "metrics", ContainerPort: httpPort, + Protocol: protocolTCP, }, { Name: "grpc", ContainerPort: grpcPort, + Protocol: protocolTCP, }, }, VolumeMounts: []corev1.VolumeMount{ @@ -105,6 +112,9 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet { MountPath: dataDirectory, }, }, + TerminationMessagePath: "/dev/termination-log", + TerminationMessagePolicy: "File", + ImagePullPolicy: "IfNotPresent", }, }, } @@ -116,7 +126,6 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet { l := ComponentLabels(LabelCompactorComponent, opts.Name) a := commonAnnotations(opts.ConfigSHA1) - return &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", @@ -157,6 +166,7 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet { corev1.ResourceStorage: opts.ResourceRequirements.Compactor.PVCSize, }, }, + VolumeMode: &volumeFileSystemMode, StorageClassName: pointer.StringPtr(opts.Stack.StorageClassName), }, }, @@ -182,8 +192,10 @@ func NewCompactorGRPCService(opts Options) *corev1.Service { ClusterIP: "None", Ports: []corev1.ServicePort{ { - Name: "grpc", - Port: grpcPort, + Name: "grpc", + Port: grpcPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: grpcPort}, }, }, Selector: l, @@ -210,8 +222,10 @@ func NewCompactorHTTPService(opts Options) *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: "metrics", - Port: httpPort, + Name: "metrics", + Port: httpPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: httpPort}, }, }, Selector: l, diff --git a/internal/manifests/distributor.go b/internal/manifests/distributor.go index 4f1571ad0b..90144c8171 100644 --- a/internal/manifests/distributor.go +++ b/internal/manifests/distributor.go @@ -45,6 +45,7 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment { Name: configVolumeName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ + DefaultMode: &defaultConfigMapMode, LocalObjectReference: corev1.LocalObjectReference{ Name: lokiConfigMapName(opts.Name), }, @@ -79,8 +80,11 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment { Scheme: corev1.URISchemeHTTP, }, }, + PeriodSeconds: 10, InitialDelaySeconds: 15, TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, }, LivenessProbe: &corev1.Probe{ Handler: corev1.Handler{ @@ -93,19 +97,23 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment { TimeoutSeconds: 2, PeriodSeconds: 30, FailureThreshold: 10, + SuccessThreshold: 1, }, Ports: []corev1.ContainerPort{ { Name: "metrics", ContainerPort: httpPort, + Protocol: protocolTCP, }, { Name: "grpc", ContainerPort: grpcPort, + Protocol: protocolTCP, }, { Name: "gossip-ring", ContainerPort: gossipPort, + Protocol: protocolTCP, }, }, VolumeMounts: []corev1.VolumeMount{ @@ -120,6 +128,9 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment { MountPath: dataDirectory, }, }, + TerminationMessagePath: "/dev/termination-log", + TerminationMessagePolicy: "File", + ImagePullPolicy: "IfNotPresent", }, }, } @@ -178,8 +189,10 @@ func NewDistributorGRPCService(opts Options) *corev1.Service { ClusterIP: "None", Ports: []corev1.ServicePort{ { - Name: "grpc", - Port: grpcPort, + Name: "grpc", + Port: grpcPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: grpcPort}, }, }, Selector: l, @@ -206,8 +219,10 @@ func NewDistributorHTTPService(opts Options) *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: "metrics", - Port: httpPort, + Name: "metrics", + Port: httpPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: httpPort}, }, }, Selector: l, diff --git a/internal/manifests/ingester.go b/internal/manifests/ingester.go index ac4db5fdfa..cfb841748b 100644 --- a/internal/manifests/ingester.go +++ b/internal/manifests/ingester.go @@ -39,6 +39,7 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet { Name: configVolumeName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ + DefaultMode: &defaultConfigMapMode, LocalObjectReference: corev1.LocalObjectReference{ Name: lokiConfigMapName(opts.Name), }, @@ -67,8 +68,11 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet { Scheme: corev1.URISchemeHTTP, }, }, + PeriodSeconds: 10, InitialDelaySeconds: 15, TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, }, LivenessProbe: &corev1.Probe{ Handler: corev1.Handler{ @@ -81,19 +85,23 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet { TimeoutSeconds: 2, PeriodSeconds: 30, FailureThreshold: 10, + SuccessThreshold: 1, }, Ports: []corev1.ContainerPort{ { Name: "metrics", ContainerPort: httpPort, + Protocol: protocolTCP, }, { Name: "grpc", ContainerPort: grpcPort, + Protocol: protocolTCP, }, { Name: "gossip-ring", ContainerPort: gossipPort, + Protocol: protocolTCP, }, }, VolumeMounts: []corev1.VolumeMount{ @@ -108,6 +116,9 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet { MountPath: dataDirectory, }, }, + TerminationMessagePath: "/dev/termination-log", + TerminationMessagePolicy: "File", + ImagePullPolicy: "IfNotPresent", }, }, } @@ -119,7 +130,6 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet { l := ComponentLabels(LabelIngesterComponent, opts.Name) a := commonAnnotations(opts.ConfigSHA1) - return &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", @@ -161,6 +171,7 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet { }, }, StorageClassName: pointer.StringPtr(opts.Stack.StorageClassName), + VolumeMode: &volumeFileSystemMode, }, }, }, @@ -185,8 +196,10 @@ func NewIngesterGRPCService(opts Options) *corev1.Service { ClusterIP: "None", Ports: []corev1.ServicePort{ { - Name: "grpc", - Port: grpcPort, + Name: "grpc", + Port: grpcPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: grpcPort}, }, }, Selector: l, @@ -213,8 +226,10 @@ func NewIngesterHTTPService(opts Options) *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: "metrics", - Port: httpPort, + Name: "metrics", + Port: httpPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: httpPort}, }, }, Selector: l, diff --git a/internal/manifests/memberlist.go b/internal/manifests/memberlist.go index 0ce6887a48..d28ac1b026 100644 --- a/internal/manifests/memberlist.go +++ b/internal/manifests/memberlist.go @@ -3,6 +3,8 @@ package manifests import ( "fmt" + "k8s.io/apimachinery/pkg/util/intstr" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -22,9 +24,10 @@ func BuildLokiGossipRingService(stackName string) *corev1.Service { ClusterIP: "None", Ports: []corev1.ServicePort{ { - Name: "gossip", - Port: gossipPort, - Protocol: "TCP", + Name: "gossip", + Port: gossipPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: gossipPort}, }, }, Selector: commonLabels(stackName), diff --git a/internal/manifests/mutate.go b/internal/manifests/mutate.go index e3ec77a984..c2ffbd2e09 100644 --- a/internal/manifests/mutate.go +++ b/internal/manifests/mutate.go @@ -3,6 +3,10 @@ package manifests import ( "reflect" + "github.com/ViaQ/logerr/log" + + "github.com/imdario/mergo" + monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/ViaQ/logerr/kverrors" @@ -22,8 +26,13 @@ import ( // - ServiceMonitor func MutateFuncFor(existing, desired client.Object) controllerutil.MutateFn { return func() error { - existing.SetAnnotations(desired.GetAnnotations()) - existing.SetLabels(desired.GetLabels()) + existingAnnotations := existing.GetAnnotations() + mergeWithOverride(&existingAnnotations, desired.GetAnnotations()) + existing.SetAnnotations(existingAnnotations) + + existingLabels := existing.GetLabels() + mergeWithOverride(&existingLabels, desired.GetLabels()) + existing.SetLabels(existingLabels) switch existing.(type) { case *corev1.ConfigMap: @@ -59,24 +68,31 @@ func MutateFuncFor(existing, desired client.Object) controllerutil.MutateFn { } } +func mergeWithOverride(dst, src interface{}) { + err := mergo.Merge(dst, src, mergo.WithOverride) + if err != nil { + log.Error(err, "unable to mergeWithOverride", "dst", dst, "src", src) + } +} + func mutateConfigMap(existing, desired *corev1.ConfigMap) { existing.BinaryData = desired.BinaryData } func mutateService(existing, desired *corev1.Service) { existing.Spec.Ports = desired.Spec.Ports - existing.Spec.Selector = desired.Spec.Selector + mergeWithOverride(&existing.Spec.Selector, desired.Spec.Selector) } func mutateDeployment(existing, desired *appsv1.Deployment) { // Deployment selector is immutable so we set this value only if // a new object is going to be created if existing.CreationTimestamp.IsZero() { - existing.Spec.Selector = desired.Spec.Selector + mergeWithOverride(existing.Spec.Selector, desired.Spec.Selector) } existing.Spec.Replicas = desired.Spec.Replicas - existing.Spec.Template = desired.Spec.Template - existing.Spec.Strategy = desired.Spec.Strategy + mergeWithOverride(&existing.Spec.Template, desired.Spec.Template) + mergeWithOverride(&existing.Spec.Strategy, desired.Spec.Strategy) } func mutateStatefulSet(existing, desired *appsv1.StatefulSet) { @@ -87,8 +103,12 @@ func mutateStatefulSet(existing, desired *appsv1.StatefulSet) { } existing.Spec.PodManagementPolicy = desired.Spec.PodManagementPolicy existing.Spec.Replicas = desired.Spec.Replicas - existing.Spec.Template = desired.Spec.Template - existing.Spec.VolumeClaimTemplates = desired.Spec.VolumeClaimTemplates + mergeWithOverride(&existing.Spec.Template, desired.Spec.Template) + for i := range existing.Spec.VolumeClaimTemplates { + existing.Spec.VolumeClaimTemplates[i].TypeMeta = desired.Spec.VolumeClaimTemplates[i].TypeMeta + existing.Spec.VolumeClaimTemplates[i].ObjectMeta = desired.Spec.VolumeClaimTemplates[i].ObjectMeta + existing.Spec.VolumeClaimTemplates[i].Spec = desired.Spec.VolumeClaimTemplates[i].Spec + } } func mutateServiceMonitor(existing, desired *monitoringv1.ServiceMonitor) { diff --git a/internal/manifests/querier.go b/internal/manifests/querier.go index fc0638d3f1..d4afadf116 100644 --- a/internal/manifests/querier.go +++ b/internal/manifests/querier.go @@ -39,6 +39,7 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet { Name: configVolumeName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ + DefaultMode: &defaultConfigMapMode, LocalObjectReference: corev1.LocalObjectReference{ Name: lokiConfigMapName(opts.Name), }, @@ -67,8 +68,11 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet { Scheme: corev1.URISchemeHTTP, }, }, + PeriodSeconds: 10, InitialDelaySeconds: 15, TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, }, LivenessProbe: &corev1.Probe{ Handler: corev1.Handler{ @@ -81,19 +85,23 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet { TimeoutSeconds: 2, PeriodSeconds: 30, FailureThreshold: 10, + SuccessThreshold: 1, }, Ports: []corev1.ContainerPort{ { Name: "metrics", ContainerPort: httpPort, + Protocol: protocolTCP, }, { Name: "grpc", ContainerPort: grpcPort, + Protocol: protocolTCP, }, { Name: "gossip-ring", ContainerPort: gossipPort, + Protocol: protocolTCP, }, }, VolumeMounts: []corev1.VolumeMount{ @@ -108,6 +116,9 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet { MountPath: dataDirectory, }, }, + TerminationMessagePath: "/dev/termination-log", + TerminationMessagePolicy: "File", + ImagePullPolicy: "IfNotPresent", }, }, } @@ -161,6 +172,7 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet { }, }, StorageClassName: pointer.StringPtr(opts.Stack.StorageClassName), + VolumeMode: &volumeFileSystemMode, }, }, }, @@ -185,8 +197,10 @@ func NewQuerierGRPCService(opts Options) *corev1.Service { ClusterIP: "None", Ports: []corev1.ServicePort{ { - Name: "grpc", - Port: grpcPort, + Name: "grpc", + Port: grpcPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: grpcPort}, }, }, Selector: l, @@ -213,8 +227,10 @@ func NewQuerierHTTPService(opts Options) *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: "http", - Port: httpPort, + Name: "http", + Port: httpPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: httpPort}, }, }, Selector: l, diff --git a/internal/manifests/query-frontend.go b/internal/manifests/query-frontend.go index 1ba0fd3ba9..da01f68f95 100644 --- a/internal/manifests/query-frontend.go +++ b/internal/manifests/query-frontend.go @@ -38,6 +38,7 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment { Name: configVolumeName, VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ + DefaultMode: &defaultConfigMapMode, LocalObjectReference: corev1.LocalObjectReference{ Name: lokiConfigMapName(opts.Name), }, @@ -72,8 +73,11 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment { Scheme: corev1.URISchemeHTTP, }, }, + PeriodSeconds: 10, InitialDelaySeconds: 15, TimeoutSeconds: 1, + SuccessThreshold: 1, + FailureThreshold: 3, }, LivenessProbe: &corev1.Probe{ Handler: corev1.Handler{ @@ -86,15 +90,18 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment { TimeoutSeconds: 2, PeriodSeconds: 30, FailureThreshold: 10, + SuccessThreshold: 1, }, Ports: []corev1.ContainerPort{ { Name: "metrics", ContainerPort: httpPort, + Protocol: protocolTCP, }, { Name: "grpc", ContainerPort: grpcPort, + Protocol: protocolTCP, }, }, VolumeMounts: []corev1.VolumeMount{ @@ -109,6 +116,9 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment { MountPath: dataDirectory, }, }, + TerminationMessagePath: "/dev/termination-log", + TerminationMessagePolicy: "File", + ImagePullPolicy: "IfNotPresent", }, }, } @@ -167,8 +177,10 @@ func NewQueryFrontendGRPCService(opts Options) *corev1.Service { ClusterIP: "None", Ports: []corev1.ServicePort{ { - Name: "grpc", - Port: grpcPort, + Name: "grpc", + Port: grpcPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: grpcPort}, }, }, Selector: l, @@ -195,8 +207,10 @@ func NewQueryFrontendHTTPService(opts Options) *corev1.Service { Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { - Name: "http", - Port: httpPort, + Name: "http", + Port: httpPort, + Protocol: protocolTCP, + TargetPort: intstr.IntOrString{IntVal: httpPort}, }, }, Selector: l, diff --git a/internal/manifests/var.go b/internal/manifests/var.go index 76e436d7e4..fbdeec045f 100644 --- a/internal/manifests/var.go +++ b/internal/manifests/var.go @@ -4,15 +4,15 @@ import ( "fmt" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" ) const ( - gossipPort = 7946 - httpPort = 3100 - grpcPort = 9095 - + gossipPort = 7946 + httpPort = 3100 + grpcPort = 9095 + protocolTCP = "TCP" // DefaultContainerImage declares the default fallback for loki image. DefaultContainerImage = "docker.io/grafana/loki:2.2.1" @@ -38,6 +38,11 @@ const ( LabelQueryFrontendComponent string = "query-frontend" ) +var ( + defaultConfigMapMode = int32(420) + volumeFileSystemMode = corev1.PersistentVolumeFilesystem +) + func commonAnnotations(h string) map[string]string { return map[string]string{ "loki.openshift.io/config-hash": h,