fix manifests and mutate function (#72)

pull/4881/head
Eran Raichstein 4 years ago committed by GitHub
parent 0094783a1b
commit 02c67bf879
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      internal/manifests/compactor.go
  2. 23
      internal/manifests/distributor.go
  3. 25
      internal/manifests/ingester.go
  4. 9
      internal/manifests/memberlist.go
  5. 36
      internal/manifests/mutate.go
  6. 24
      internal/manifests/querier.go
  7. 22
      internal/manifests/query-frontend.go
  8. 15
      internal/manifests/var.go

@ -40,6 +40,7 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet {
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
DefaultMode: &defaultConfigMapMode,
LocalObjectReference: corev1.LocalObjectReference{
Name: lokiConfigMapName(opts.Name),
},
@ -68,8 +69,11 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet {
Scheme: corev1.URISchemeHTTP,
},
},
PeriodSeconds: 10,
InitialDelaySeconds: 15,
TimeoutSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
@ -82,15 +86,18 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet {
TimeoutSeconds: 2,
PeriodSeconds: 30,
FailureThreshold: 10,
SuccessThreshold: 1,
},
Ports: []corev1.ContainerPort{
{
Name: "metrics",
ContainerPort: httpPort,
Protocol: protocolTCP,
},
{
Name: "grpc",
ContainerPort: grpcPort,
Protocol: protocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
@ -105,6 +112,9 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet {
MountPath: dataDirectory,
},
},
TerminationMessagePath: "/dev/termination-log",
TerminationMessagePolicy: "File",
ImagePullPolicy: "IfNotPresent",
},
},
}
@ -116,7 +126,6 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet {
l := ComponentLabels(LabelCompactorComponent, opts.Name)
a := commonAnnotations(opts.ConfigSHA1)
return &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
@ -157,6 +166,7 @@ func NewCompactorStatefulSet(opts Options) *appsv1.StatefulSet {
corev1.ResourceStorage: opts.ResourceRequirements.Compactor.PVCSize,
},
},
VolumeMode: &volumeFileSystemMode,
StorageClassName: pointer.StringPtr(opts.Stack.StorageClassName),
},
},
@ -182,8 +192,10 @@ func NewCompactorGRPCService(opts Options) *corev1.Service {
ClusterIP: "None",
Ports: []corev1.ServicePort{
{
Name: "grpc",
Port: grpcPort,
Name: "grpc",
Port: grpcPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: grpcPort},
},
},
Selector: l,
@ -210,8 +222,10 @@ func NewCompactorHTTPService(opts Options) *corev1.Service {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "metrics",
Port: httpPort,
Name: "metrics",
Port: httpPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: httpPort},
},
},
Selector: l,

@ -45,6 +45,7 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment {
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
DefaultMode: &defaultConfigMapMode,
LocalObjectReference: corev1.LocalObjectReference{
Name: lokiConfigMapName(opts.Name),
},
@ -79,8 +80,11 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment {
Scheme: corev1.URISchemeHTTP,
},
},
PeriodSeconds: 10,
InitialDelaySeconds: 15,
TimeoutSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
@ -93,19 +97,23 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment {
TimeoutSeconds: 2,
PeriodSeconds: 30,
FailureThreshold: 10,
SuccessThreshold: 1,
},
Ports: []corev1.ContainerPort{
{
Name: "metrics",
ContainerPort: httpPort,
Protocol: protocolTCP,
},
{
Name: "grpc",
ContainerPort: grpcPort,
Protocol: protocolTCP,
},
{
Name: "gossip-ring",
ContainerPort: gossipPort,
Protocol: protocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
@ -120,6 +128,9 @@ func NewDistributorDeployment(opts Options) *appsv1.Deployment {
MountPath: dataDirectory,
},
},
TerminationMessagePath: "/dev/termination-log",
TerminationMessagePolicy: "File",
ImagePullPolicy: "IfNotPresent",
},
},
}
@ -178,8 +189,10 @@ func NewDistributorGRPCService(opts Options) *corev1.Service {
ClusterIP: "None",
Ports: []corev1.ServicePort{
{
Name: "grpc",
Port: grpcPort,
Name: "grpc",
Port: grpcPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: grpcPort},
},
},
Selector: l,
@ -206,8 +219,10 @@ func NewDistributorHTTPService(opts Options) *corev1.Service {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "metrics",
Port: httpPort,
Name: "metrics",
Port: httpPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: httpPort},
},
},
Selector: l,

@ -39,6 +39,7 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet {
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
DefaultMode: &defaultConfigMapMode,
LocalObjectReference: corev1.LocalObjectReference{
Name: lokiConfigMapName(opts.Name),
},
@ -67,8 +68,11 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet {
Scheme: corev1.URISchemeHTTP,
},
},
PeriodSeconds: 10,
InitialDelaySeconds: 15,
TimeoutSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
@ -81,19 +85,23 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet {
TimeoutSeconds: 2,
PeriodSeconds: 30,
FailureThreshold: 10,
SuccessThreshold: 1,
},
Ports: []corev1.ContainerPort{
{
Name: "metrics",
ContainerPort: httpPort,
Protocol: protocolTCP,
},
{
Name: "grpc",
ContainerPort: grpcPort,
Protocol: protocolTCP,
},
{
Name: "gossip-ring",
ContainerPort: gossipPort,
Protocol: protocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
@ -108,6 +116,9 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet {
MountPath: dataDirectory,
},
},
TerminationMessagePath: "/dev/termination-log",
TerminationMessagePolicy: "File",
ImagePullPolicy: "IfNotPresent",
},
},
}
@ -119,7 +130,6 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet {
l := ComponentLabels(LabelIngesterComponent, opts.Name)
a := commonAnnotations(opts.ConfigSHA1)
return &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
@ -161,6 +171,7 @@ func NewIngesterStatefulSet(opts Options) *appsv1.StatefulSet {
},
},
StorageClassName: pointer.StringPtr(opts.Stack.StorageClassName),
VolumeMode: &volumeFileSystemMode,
},
},
},
@ -185,8 +196,10 @@ func NewIngesterGRPCService(opts Options) *corev1.Service {
ClusterIP: "None",
Ports: []corev1.ServicePort{
{
Name: "grpc",
Port: grpcPort,
Name: "grpc",
Port: grpcPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: grpcPort},
},
},
Selector: l,
@ -213,8 +226,10 @@ func NewIngesterHTTPService(opts Options) *corev1.Service {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "metrics",
Port: httpPort,
Name: "metrics",
Port: httpPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: httpPort},
},
},
Selector: l,

@ -3,6 +3,8 @@ package manifests
import (
"fmt"
"k8s.io/apimachinery/pkg/util/intstr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -22,9 +24,10 @@ func BuildLokiGossipRingService(stackName string) *corev1.Service {
ClusterIP: "None",
Ports: []corev1.ServicePort{
{
Name: "gossip",
Port: gossipPort,
Protocol: "TCP",
Name: "gossip",
Port: gossipPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: gossipPort},
},
},
Selector: commonLabels(stackName),

@ -3,6 +3,10 @@ package manifests
import (
"reflect"
"github.com/ViaQ/logerr/log"
"github.com/imdario/mergo"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/ViaQ/logerr/kverrors"
@ -22,8 +26,13 @@ import (
// - ServiceMonitor
func MutateFuncFor(existing, desired client.Object) controllerutil.MutateFn {
return func() error {
existing.SetAnnotations(desired.GetAnnotations())
existing.SetLabels(desired.GetLabels())
existingAnnotations := existing.GetAnnotations()
mergeWithOverride(&existingAnnotations, desired.GetAnnotations())
existing.SetAnnotations(existingAnnotations)
existingLabels := existing.GetLabels()
mergeWithOverride(&existingLabels, desired.GetLabels())
existing.SetLabels(existingLabels)
switch existing.(type) {
case *corev1.ConfigMap:
@ -59,24 +68,31 @@ func MutateFuncFor(existing, desired client.Object) controllerutil.MutateFn {
}
}
func mergeWithOverride(dst, src interface{}) {
err := mergo.Merge(dst, src, mergo.WithOverride)
if err != nil {
log.Error(err, "unable to mergeWithOverride", "dst", dst, "src", src)
}
}
func mutateConfigMap(existing, desired *corev1.ConfigMap) {
existing.BinaryData = desired.BinaryData
}
func mutateService(existing, desired *corev1.Service) {
existing.Spec.Ports = desired.Spec.Ports
existing.Spec.Selector = desired.Spec.Selector
mergeWithOverride(&existing.Spec.Selector, desired.Spec.Selector)
}
func mutateDeployment(existing, desired *appsv1.Deployment) {
// Deployment selector is immutable so we set this value only if
// a new object is going to be created
if existing.CreationTimestamp.IsZero() {
existing.Spec.Selector = desired.Spec.Selector
mergeWithOverride(existing.Spec.Selector, desired.Spec.Selector)
}
existing.Spec.Replicas = desired.Spec.Replicas
existing.Spec.Template = desired.Spec.Template
existing.Spec.Strategy = desired.Spec.Strategy
mergeWithOverride(&existing.Spec.Template, desired.Spec.Template)
mergeWithOverride(&existing.Spec.Strategy, desired.Spec.Strategy)
}
func mutateStatefulSet(existing, desired *appsv1.StatefulSet) {
@ -87,8 +103,12 @@ func mutateStatefulSet(existing, desired *appsv1.StatefulSet) {
}
existing.Spec.PodManagementPolicy = desired.Spec.PodManagementPolicy
existing.Spec.Replicas = desired.Spec.Replicas
existing.Spec.Template = desired.Spec.Template
existing.Spec.VolumeClaimTemplates = desired.Spec.VolumeClaimTemplates
mergeWithOverride(&existing.Spec.Template, desired.Spec.Template)
for i := range existing.Spec.VolumeClaimTemplates {
existing.Spec.VolumeClaimTemplates[i].TypeMeta = desired.Spec.VolumeClaimTemplates[i].TypeMeta
existing.Spec.VolumeClaimTemplates[i].ObjectMeta = desired.Spec.VolumeClaimTemplates[i].ObjectMeta
existing.Spec.VolumeClaimTemplates[i].Spec = desired.Spec.VolumeClaimTemplates[i].Spec
}
}
func mutateServiceMonitor(existing, desired *monitoringv1.ServiceMonitor) {

@ -39,6 +39,7 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet {
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
DefaultMode: &defaultConfigMapMode,
LocalObjectReference: corev1.LocalObjectReference{
Name: lokiConfigMapName(opts.Name),
},
@ -67,8 +68,11 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet {
Scheme: corev1.URISchemeHTTP,
},
},
PeriodSeconds: 10,
InitialDelaySeconds: 15,
TimeoutSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
@ -81,19 +85,23 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet {
TimeoutSeconds: 2,
PeriodSeconds: 30,
FailureThreshold: 10,
SuccessThreshold: 1,
},
Ports: []corev1.ContainerPort{
{
Name: "metrics",
ContainerPort: httpPort,
Protocol: protocolTCP,
},
{
Name: "grpc",
ContainerPort: grpcPort,
Protocol: protocolTCP,
},
{
Name: "gossip-ring",
ContainerPort: gossipPort,
Protocol: protocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
@ -108,6 +116,9 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet {
MountPath: dataDirectory,
},
},
TerminationMessagePath: "/dev/termination-log",
TerminationMessagePolicy: "File",
ImagePullPolicy: "IfNotPresent",
},
},
}
@ -161,6 +172,7 @@ func NewQuerierStatefulSet(opts Options) *appsv1.StatefulSet {
},
},
StorageClassName: pointer.StringPtr(opts.Stack.StorageClassName),
VolumeMode: &volumeFileSystemMode,
},
},
},
@ -185,8 +197,10 @@ func NewQuerierGRPCService(opts Options) *corev1.Service {
ClusterIP: "None",
Ports: []corev1.ServicePort{
{
Name: "grpc",
Port: grpcPort,
Name: "grpc",
Port: grpcPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: grpcPort},
},
},
Selector: l,
@ -213,8 +227,10 @@ func NewQuerierHTTPService(opts Options) *corev1.Service {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http",
Port: httpPort,
Name: "http",
Port: httpPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: httpPort},
},
},
Selector: l,

@ -38,6 +38,7 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment {
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
DefaultMode: &defaultConfigMapMode,
LocalObjectReference: corev1.LocalObjectReference{
Name: lokiConfigMapName(opts.Name),
},
@ -72,8 +73,11 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment {
Scheme: corev1.URISchemeHTTP,
},
},
PeriodSeconds: 10,
InitialDelaySeconds: 15,
TimeoutSeconds: 1,
SuccessThreshold: 1,
FailureThreshold: 3,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
@ -86,15 +90,18 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment {
TimeoutSeconds: 2,
PeriodSeconds: 30,
FailureThreshold: 10,
SuccessThreshold: 1,
},
Ports: []corev1.ContainerPort{
{
Name: "metrics",
ContainerPort: httpPort,
Protocol: protocolTCP,
},
{
Name: "grpc",
ContainerPort: grpcPort,
Protocol: protocolTCP,
},
},
VolumeMounts: []corev1.VolumeMount{
@ -109,6 +116,9 @@ func NewQueryFrontendDeployment(opts Options) *appsv1.Deployment {
MountPath: dataDirectory,
},
},
TerminationMessagePath: "/dev/termination-log",
TerminationMessagePolicy: "File",
ImagePullPolicy: "IfNotPresent",
},
},
}
@ -167,8 +177,10 @@ func NewQueryFrontendGRPCService(opts Options) *corev1.Service {
ClusterIP: "None",
Ports: []corev1.ServicePort{
{
Name: "grpc",
Port: grpcPort,
Name: "grpc",
Port: grpcPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: grpcPort},
},
},
Selector: l,
@ -195,8 +207,10 @@ func NewQueryFrontendHTTPService(opts Options) *corev1.Service {
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http",
Port: httpPort,
Name: "http",
Port: httpPort,
Protocol: protocolTCP,
TargetPort: intstr.IntOrString{IntVal: httpPort},
},
},
Selector: l,

@ -4,15 +4,15 @@ import (
"fmt"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
const (
gossipPort = 7946
httpPort = 3100
grpcPort = 9095
gossipPort = 7946
httpPort = 3100
grpcPort = 9095
protocolTCP = "TCP"
// DefaultContainerImage declares the default fallback for loki image.
DefaultContainerImage = "docker.io/grafana/loki:2.2.1"
@ -38,6 +38,11 @@ const (
LabelQueryFrontendComponent string = "query-frontend"
)
var (
defaultConfigMapMode = int32(420)
volumeFileSystemMode = corev1.PersistentVolumeFilesystem
)
func commonAnnotations(h string) map[string]string {
return map[string]string{
"loki.openshift.io/config-hash": h,

Loading…
Cancel
Save