From c91493a16bad171b02d3a46c053d7703c6621309 Mon Sep 17 00:00:00 2001 From: Brett Jones Date: Wed, 17 Mar 2021 14:28:10 -0500 Subject: [PATCH] create querier, and querier frontend --- config/manager/kustomization.yaml | 2 +- controllers/lokistack_controller.go | 3 +- internal/manifests/build.go | 16 +- internal/manifests/config/loki-config.yaml | 5 +- internal/manifests/distributor.go | 87 +++++----- internal/manifests/ingester.go | 86 ++++++---- internal/manifests/querier.go | 144 ++++++++++++++++ internal/manifests/query-frontend.go | 190 +++++++++++++++++++++ 8 files changed, 439 insertions(+), 94 deletions(-) create mode 100644 internal/manifests/querier.go create mode 100644 internal/manifests/query-frontend.go diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index b1f01063f4..2f61bf1675 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -13,4 +13,4 @@ kind: Kustomization images: - name: controller newName: quay.io/blockloop/loki-operator - newTag: "1615997355" + newTag: "1616008582" diff --git a/controllers/lokistack_controller.go b/controllers/lokistack_controller.go index da52782e4a..0b12ac3301 100644 --- a/controllers/lokistack_controller.go +++ b/controllers/lokistack_controller.go @@ -75,8 +75,7 @@ func (r *LokiStackReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( "object", obj) obj.SetNamespace(req.Namespace) - err := r.Create(ctx, obj) - if err != nil { + if err := r.Create(ctx, obj); err != nil { l.Error(err, "failed to create object") continue } diff --git a/internal/manifests/build.go b/internal/manifests/build.go index dc64b21d54..79cdb22b6c 100644 --- a/internal/manifests/build.go +++ b/internal/manifests/build.go @@ -15,18 +15,10 @@ func BuildAll(stackName, namespace string) ([]client.Object, error) { } res = append(res, cm) - res = append(res, DistributorDeployment(stackName)) - - for _, svc := range DistributorServices(stackName) { - res = append(res, svc) - } - - res = append(res, IngesterDeployment(stackName)) - - for _, svc := range IngesterServices(stackName) { - res = append(res, svc) - } - + res = append(res, BuildDistributor(stackName)...) + res = append(res, BuildIngester(stackName)...) + res = append(res, BuildQuerier(stackName)...) + res = append(res, BuildQueryFrontend(stackName)...) res = append(res, LokiGossipRingService(stackName)) return res, nil diff --git a/internal/manifests/config/loki-config.yaml b/internal/manifests/config/loki-config.yaml index 03a3e39c67..d4970036be 100644 --- a/internal/manifests/config/loki-config.yaml +++ b/internal/manifests/config/loki-config.yaml @@ -1,10 +1,7 @@ --- auth_enabled: false chunk_store_config: - max_look_back_period: 0s -# chunk_cache_config: -# filesystem: -# directory: /tmp/loki/chunk-cache + enable_fifocache: yes distributor: ring: kvstore: diff --git a/internal/manifests/distributor.go b/internal/manifests/distributor.go index 0eba8bf649..a1dfebb204 100644 --- a/internal/manifests/distributor.go +++ b/internal/manifests/distributor.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -19,8 +20,17 @@ const ( dataDirectory = "/tmp/loki" ) -// DistributorDeployment creates a deployment object for a distributor -func DistributorDeployment(stackName string) *apps.Deployment { +// BuildDistributor returns a list of k8s objects for Loki Distributor +func BuildDistributor(stackName string) []client.Object { + return []client.Object{ + NewDistributorDeployment(stackName), + NewDistributorHTTPService(stackName), + NewDistributorGRPCService(stackName), + } +} + +// NewDistributorDeployment creates a deployment object for a distributor +func NewDistributorDeployment(stackName string) *apps.Deployment { podSpec := core.PodSpec{ Volumes: []core.Volume{ { @@ -141,48 +151,49 @@ func DistributorDeployment(stackName string) *apps.Deployment { } } -func DistributorServices(stackName string) []*core.Service { +func NewDistributorHTTPService(stackName string) *core.Service { l := ComponentLabels("distributor", stackName) - - return []*core.Service{ - { - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: apps.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("loki-distributor-grpc-%s", stackName), - Labels: l, - }, - Spec: core.ServiceSpec{ - ClusterIP: "None", - Ports: []core.ServicePort{ - { - Name: "grpc", - Port: grpcPort, - }, + return &core.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: serviceNameDistributorGRPC(stackName), + Labels: l, + }, + Spec: core.ServiceSpec{ + ClusterIP: "None", + Ports: []core.ServicePort{ + { + Name: "grpc", + Port: grpcPort, }, - Selector: l, }, + Selector: l, }, - { - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: apps.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("loki-distributor-http-%s", stackName), - Labels: l, - }, - Spec: core.ServiceSpec{ - Ports: []core.ServicePort{ - { - Name: "metrics", - Port: httpPort, - }, + } +} + +func NewDistributorGRPCService(stackName string) *core.Service { + l := ComponentLabels("distributor", stackName) + return &core.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: serviceNameDistributorHTTP(stackName), + Labels: l, + }, + Spec: core.ServiceSpec{ + Ports: []core.ServicePort{ + { + Name: "metrics", + Port: httpPort, }, - Selector: l, }, + Selector: l, }, } } diff --git a/internal/manifests/ingester.go b/internal/manifests/ingester.go index 66ea7316bb..931949ea49 100644 --- a/internal/manifests/ingester.go +++ b/internal/manifests/ingester.go @@ -11,10 +11,20 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" ) -// IngesterDeployment creates a deployment object for an ingester -func IngesterDeployment(stackName string) *apps.Deployment { +// BuildIngester builds the k8s objects required to run Loki Ingester +func BuildIngester(stackName string) []client.Object { + return []client.Object{ + NewIngesterDeployment(stackName), + NewIngesterGRPCService(stackName), + NewIngesterHTTPService(stackName), + } +} + +// NewIngesterDeployment creates a deployment object for an ingester +func NewIngesterDeployment(stackName string) *apps.Deployment { podSpec := core.PodSpec{ Volumes: []core.Volume{ { @@ -135,47 +145,49 @@ func IngesterDeployment(stackName string) *apps.Deployment { } } -func IngesterServices(stackName string) []*core.Service { +func NewIngesterGRPCService(stackName string) *core.Service { l := ComponentLabels("ingester", stackName) - return []*core.Service{ - { - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: apps.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("loki-ingester-grpc-%s", stackName), - Labels: l, - }, - Spec: core.ServiceSpec{ - ClusterIP: "None", - Ports: []core.ServicePort{ - { - Name: "grpc", - Port: grpcPort, - }, + return &core.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: serviceNameIngesterGRPC(stackName), + Labels: l, + }, + Spec: core.ServiceSpec{ + ClusterIP: "None", + Ports: []core.ServicePort{ + { + Name: "grpc", + Port: grpcPort, }, - Selector: l, }, + Selector: l, }, - { - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: apps.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("loki-ingester-http-%s", stackName), - Labels: l, - }, - Spec: core.ServiceSpec{ - Ports: []core.ServicePort{ - { - Name: "metrics", - Port: httpPort, - }, + } +} + +func NewIngesterHTTPService(stackName string) *core.Service { + l := ComponentLabels("ingester", stackName) + return &core.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: serviceNameIngesterHTTP(stackName), + Labels: l, + }, + Spec: core.ServiceSpec{ + Ports: []core.ServicePort{ + { + Name: "metrics", + Port: httpPort, }, - Selector: l, }, + Selector: l, }, } } diff --git a/internal/manifests/querier.go b/internal/manifests/querier.go new file mode 100644 index 0000000000..495008a4ef --- /dev/null +++ b/internal/manifests/querier.go @@ -0,0 +1,144 @@ +package manifests + +import ( + "fmt" + "path" + + "github.com/ViaQ/loki-operator/internal/manifests/config" + apps "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// BuildQuerier returns a list of k8s objects for Loki Querier +func BuildQuerier(stackName string) []client.Object { + return []client.Object{ + NewQuerierDeployment(stackName), + } +} + +// NewQuerierDeployment creates a deployment object for a querier +func NewQuerierDeployment(stackName string) *apps.Deployment { + podSpec := core.PodSpec{ + Volumes: []core.Volume{ + { + Name: configVolumeName, + VolumeSource: core.VolumeSource{ + ConfigMap: &core.ConfigMapVolumeSource{ + LocalObjectReference: core.LocalObjectReference{ + Name: lokiConfigMapName(stackName), + }, + }, + }, + }, + { + Name: storageVolumeName, + VolumeSource: core.VolumeSource{ + EmptyDir: &core.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []core.Container{ + { + Image: containerImage, + Name: "loki-querier", + Args: []string{ + "-target=querier", + fmt.Sprintf("-config.file=%s", path.Join(config.LokiConfigMountDir, config.LokiConfigFileName)), + }, + ReadinessProbe: &core.Probe{ + Handler: core.Handler{ + HTTPGet: &core.HTTPGetAction{ + Path: "/ready", + Port: intstr.FromInt(httpPort), + Scheme: core.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 15, + TimeoutSeconds: 1, + }, + LivenessProbe: &core.Probe{ + Handler: core.Handler{ + HTTPGet: &core.HTTPGetAction{ + Path: "/metrics", + Port: intstr.FromInt(httpPort), + Scheme: core.URISchemeHTTP, + }, + }, + TimeoutSeconds: 2, + PeriodSeconds: 30, + FailureThreshold: 10, + }, + Ports: []core.ContainerPort{ + { + Name: "metrics", + ContainerPort: httpPort, + }, + { + Name: "grpc", + ContainerPort: grpcPort, + }, + { + Name: "gossip-ring", + ContainerPort: gossipPort, + }, + }, + // Resources: core.ResourceRequirements{ + // Limits: core.ResourceList{ + // core.ResourceMemory: resource.MustParse("1Gi"), + // core.ResourceCPU: resource.MustParse("1000m"), + // }, + // Requests: core.ResourceList{ + // core.ResourceMemory: resource.MustParse("50m"), + // core.ResourceCPU: resource.MustParse("50m"), + // }, + // }, + VolumeMounts: []core.VolumeMount{ + { + Name: configVolumeName, + ReadOnly: false, + MountPath: config.LokiConfigMountDir, + }, + { + Name: storageVolumeName, + ReadOnly: false, + MountPath: dataDirectory, + }, + }, + }, + }, + } + + l := ComponentLabels("querier", stackName) + + return &apps.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("loki-querier-%s", stackName), + Labels: l, + }, + Spec: apps.DeploymentSpec{ + Replicas: pointer.Int32Ptr(int32(3)), + Selector: &metav1.LabelSelector{ + MatchLabels: labels.Merge(l, GossipLabels()), + }, + Template: core.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("loki-querier-%s", stackName), + Labels: labels.Merge(l, GossipLabels()), + }, + Spec: podSpec, + }, + Strategy: apps.DeploymentStrategy{ + Type: apps.RollingUpdateDeploymentStrategyType, + }, + }, + } +} \ No newline at end of file diff --git a/internal/manifests/query-frontend.go b/internal/manifests/query-frontend.go new file mode 100644 index 0000000000..900500bee3 --- /dev/null +++ b/internal/manifests/query-frontend.go @@ -0,0 +1,190 @@ +package manifests + +import ( + "fmt" + "path" + + "github.com/ViaQ/loki-operator/internal/manifests/config" + apps "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// BuildQueryFrontend returns a list of k8s objects for Loki QueryFrontend +func BuildQueryFrontend(stackName string) []client.Object { + return []client.Object{ + NewQueryFrontendDeployment(stackName), + NewQueryFrontendGRPCService(stackName), + NewQueryFrontendHTTPService(stackName), + } +} + +// NewQueryFrontendDeployment creates a deployment object for a query-frontend +func NewQueryFrontendDeployment(stackName string) *apps.Deployment { + podSpec := core.PodSpec{ + Volumes: []core.Volume{ + { + Name: configVolumeName, + VolumeSource: core.VolumeSource{ + ConfigMap: &core.ConfigMapVolumeSource{ + LocalObjectReference: core.LocalObjectReference{ + Name: lokiConfigMapName(stackName), + }, + }, + }, + }, + { + Name: storageVolumeName, + VolumeSource: core.VolumeSource{ + EmptyDir: &core.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []core.Container{ + { + Image: containerImage, + Name: "loki-query-frontend", + Args: []string{ + "-target=query-frontend", + fmt.Sprintf("-config.file=%s", path.Join(config.LokiConfigMountDir, config.LokiConfigFileName)), + }, + ReadinessProbe: &core.Probe{ + Handler: core.Handler{ + HTTPGet: &core.HTTPGetAction{ + Path: "/metrics", + Port: intstr.FromInt(httpPort), + Scheme: core.URISchemeHTTP, + }, + }, + InitialDelaySeconds: 15, + TimeoutSeconds: 1, + }, + LivenessProbe: &core.Probe{ + Handler: core.Handler{ + HTTPGet: &core.HTTPGetAction{ + Path: "/metrics", + Port: intstr.FromInt(httpPort), + Scheme: core.URISchemeHTTP, + }, + }, + TimeoutSeconds: 2, + PeriodSeconds: 30, + FailureThreshold: 10, + }, + Ports: []core.ContainerPort{ + { + Name: "metrics", + ContainerPort: httpPort, + }, + { + Name: "grpc", + ContainerPort: grpcPort, + }, + }, + // Resources: core.ResourceRequirements{ + // Limits: core.ResourceList{ + // core.ResourceMemory: resource.MustParse("1Gi"), + // core.ResourceCPU: resource.MustParse("1000m"), + // }, + // Requests: core.ResourceList{ + // core.ResourceMemory: resource.MustParse("50m"), + // core.ResourceCPU: resource.MustParse("50m"), + // }, + // }, + VolumeMounts: []core.VolumeMount{ + { + Name: configVolumeName, + ReadOnly: false, + MountPath: config.LokiConfigMountDir, + }, + { + Name: storageVolumeName, + ReadOnly: false, + MountPath: dataDirectory, + }, + }, + }, + }, + } + + l := ComponentLabels("query-frontend", stackName) + + return &apps.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("loki-query-frontend-%s", stackName), + Labels: l, + }, + Spec: apps.DeploymentSpec{ + Replicas: pointer.Int32Ptr(int32(3)), + Selector: &metav1.LabelSelector{ + MatchLabels: labels.Merge(l, GossipLabels()), + }, + Template: core.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("loki-query-frontend-%s", stackName), + Labels: labels.Merge(l, GossipLabels()), + }, + Spec: podSpec, + }, + Strategy: apps.DeploymentStrategy{ + Type: apps.RollingUpdateDeploymentStrategyType, + }, + }, + } +} + +func NewQueryFrontendGRPCService(stackName string) *core.Service { + l := ComponentLabels("query-frontend", stackName) + return &core.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("loki-query-frontend-grpc-%s", stackName), + Labels: l, + }, + Spec: core.ServiceSpec{ + ClusterIP: "None", + Ports: []core.ServicePort{ + { + Name: "grpc", + Port: grpcPort, + }, + }, + Selector: l, + }, + } +} + +func NewQueryFrontendHTTPService(stackName string) *core.Service { + l := ComponentLabels("query-frontend", stackName) + return &core.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: apps.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("loki-query-frontend-http-%s", stackName), + Labels: l, + }, + Spec: core.ServiceSpec{ + ClusterIP: "None", + Ports: []core.ServicePort{ + { + Name: "grpc", + Port: grpcPort, + }, + }, + Selector: l, + }, + } +}