operator: Refactor status update to reduce API calls (#8578)

pull/8597/head
Robert Jacob 2 years ago committed by GitHub
parent 233997ef20
commit 93c35a7a13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      operator/CHANGELOG.md
  2. 3
      operator/controllers/loki/lokistack_controller.go
  3. 50
      operator/internal/status/components.go
  4. 371
      operator/internal/status/components_test.go
  5. 87
      operator/internal/status/lokistack.go
  6. 434
      operator/internal/status/lokistack_test.go
  7. 94
      operator/internal/status/status.go
  8. 83
      operator/internal/status/status_test.go

@ -1,5 +1,6 @@
## Main
- [8578](https://github.com/grafana/loki/pull/8578) **xperimental**: Refactor status update to reduce API calls
- [8577](https://github.com/grafana/loki/pull/8577) **Red-GV**: Store gateway tenant information in secret instead of configmap
- [8397](https://github.com/grafana/loki/pull/8397) **periklis**: Update Loki operand to v2.7.3
- [8308](https://github.com/grafana/loki/pull/8308) **aminesnow**: operator: Cleanup ruler resources when disabled

@ -3,6 +3,7 @@ package controllers
import (
"context"
"errors"
"time"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
@ -158,7 +159,7 @@ func (r *LokiStackReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return res, derr
}
err = status.Refresh(ctx, r.Client, req)
err = status.Refresh(ctx, r.Client, req, time.Now())
if err != nil {
return ctrl.Result{}, err
}

@ -9,64 +9,54 @@ import (
"github.com/grafana/loki/operator/internal/manifests"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// SetComponentsStatus updates the pod status map component
func SetComponentsStatus(ctx context.Context, k k8s.Client, req ctrl.Request) error {
var s lokiv1.LokiStack
if err := k.Get(ctx, req.NamespacedName, &s); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return kverrors.Wrap(err, "failed to lookup lokistack", "name", req.NamespacedName)
}
// generateComponentStatus updates the pod status map component
func generateComponentStatus(ctx context.Context, k k8s.Client, s *lokiv1.LokiStack) (*lokiv1.LokiStackComponentStatus, error) {
var err error
s.Status.Components = lokiv1.LokiStackComponentStatus{}
s.Status.Components.Compactor, err = appendPodStatus(ctx, k, manifests.LabelCompactorComponent, s.Name, s.Namespace)
result := &lokiv1.LokiStackComponentStatus{}
result.Compactor, err = appendPodStatus(ctx, k, manifests.LabelCompactorComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelCompactorComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelCompactorComponent)
}
s.Status.Components.Querier, err = appendPodStatus(ctx, k, manifests.LabelQuerierComponent, s.Name, s.Namespace)
result.Querier, err = appendPodStatus(ctx, k, manifests.LabelQuerierComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelQuerierComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelQuerierComponent)
}
s.Status.Components.Distributor, err = appendPodStatus(ctx, k, manifests.LabelDistributorComponent, s.Name, s.Namespace)
result.Distributor, err = appendPodStatus(ctx, k, manifests.LabelDistributorComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelDistributorComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelDistributorComponent)
}
s.Status.Components.QueryFrontend, err = appendPodStatus(ctx, k, manifests.LabelQueryFrontendComponent, s.Name, s.Namespace)
result.QueryFrontend, err = appendPodStatus(ctx, k, manifests.LabelQueryFrontendComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelQueryFrontendComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelQueryFrontendComponent)
}
s.Status.Components.IndexGateway, err = appendPodStatus(ctx, k, manifests.LabelIndexGatewayComponent, s.Name, s.Namespace)
result.IndexGateway, err = appendPodStatus(ctx, k, manifests.LabelIndexGatewayComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelIngesterComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelIngesterComponent)
}
s.Status.Components.Ingester, err = appendPodStatus(ctx, k, manifests.LabelIngesterComponent, s.Name, s.Namespace)
result.Ingester, err = appendPodStatus(ctx, k, manifests.LabelIngesterComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelIndexGatewayComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelIndexGatewayComponent)
}
s.Status.Components.Gateway, err = appendPodStatus(ctx, k, manifests.LabelGatewayComponent, s.Name, s.Namespace)
result.Gateway, err = appendPodStatus(ctx, k, manifests.LabelGatewayComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelGatewayComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelGatewayComponent)
}
s.Status.Components.Ruler, err = appendPodStatus(ctx, k, manifests.LabelRulerComponent, s.Name, s.Namespace)
result.Ruler, err = appendPodStatus(ctx, k, manifests.LabelRulerComponent, s.Name, s.Namespace)
if err != nil {
return kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelRulerComponent)
return nil, kverrors.Wrap(err, "failed lookup LokiStack component pods status", "name", manifests.LabelRulerComponent)
}
return k.Status().Update(ctx, &s, &client.UpdateOptions{})
return result, nil
}
func appendPodStatus(ctx context.Context, k k8s.Client, component, stack, ns string) (lokiv1.PodStatusMap, error) {

@ -1,316 +1,137 @@
package status_test
package status
import (
"context"
"fmt"
"testing"
lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/external/k8s/k8sfakes"
"github.com/grafana/loki/operator/internal/status"
"github.com/grafana/loki/operator/internal/manifests"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func TestSetComponentsStatus_WhenGetLokiStackReturnsError_ReturnError(t *testing.T) {
k := &k8sfakes.FakeClient{}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewBadRequest("something wasn't found")
}
err := status.SetComponentsStatus(context.TODO(), k, r)
require.Error(t, err)
}
func TestSetComponentsStatus_WhenGetLokiStackReturnsNotFound_DoNothing(t *testing.T) {
k := &k8sfakes.FakeClient{}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
func createPodList(baseName string, phases ...corev1.PodPhase) *corev1.PodList {
items := []corev1.Pod{}
for i, p := range phases {
items = append(items, corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pod-%d", baseName, i),
},
Status: corev1.PodStatus{
Phase: p,
},
})
}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
return &corev1.PodList{
Items: items,
}
err := status.SetComponentsStatus(context.TODO(), k, r)
require.NoError(t, err)
}
func TestSetComponentsStatus_WhenListReturnError_ReturnError(t *testing.T) {
sw := &k8sfakes.FakeStatusWriter{}
k := &k8sfakes.FakeClient{}
k.StatusStub = func() client.StatusWriter { return sw }
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
if r.Name == name.Name && r.Namespace == name.Namespace {
k.SetClientObject(object, &s)
return nil
func setupListClient(t *testing.T, stack *lokiv1.LokiStack, componentPods map[string]*corev1.PodList) (*k8sfakes.FakeClient, *k8sfakes.FakeStatusWriter) {
k, sw := setupFakesNoError(t, stack)
k.ListStub = func(_ context.Context, list client.ObjectList, options ...client.ListOption) error {
componentLabel := ""
for _, o := range options {
if m, ok := o.(client.MatchingLabels); ok {
componentLabel = m["app.kubernetes.io/component"]
}
}
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
k.ListStub = func(_ context.Context, l client.ObjectList, opts ...client.ListOption) error {
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
err := status.SetComponentsStatus(context.TODO(), k, r)
require.Error(t, err)
}
func TestSetComponentsStatus_WhenPodListExisting_SetPodStatusMap(t *testing.T) {
sw := &k8sfakes.FakeStatusWriter{}
k := &k8sfakes.FakeClient{}
k.StatusStub = func() client.StatusWriter { return sw }
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
if r.Name == name.Name && r.Namespace == name.Namespace {
k.SetClientObject(object, &s)
return nil
if componentLabel == "" {
t.Fatalf("no component label on list call: %s", options)
}
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
k.ListStub = func(_ context.Context, l client.ObjectList, _ ...client.ListOption) error {
pods := v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-a",
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-b",
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
},
},
podList, ok := componentPods[componentLabel]
if !ok {
t.Fatalf("no pods found for label: %s", componentLabel)
}
k.SetClientObjectList(l, &pods)
return nil
}
expected := lokiv1.PodStatusMap{
"Pending": []string{"pod-a"},
"Running": []string{"pod-b"},
}
sw.UpdateStub = func(_ context.Context, obj client.Object, _ ...client.UpdateOption) error {
stack := obj.(*lokiv1.LokiStack)
require.Equal(t, expected, stack.Status.Components.Compactor)
k.SetClientObjectList(list, podList)
return nil
}
err := status.SetComponentsStatus(context.TODO(), k, r)
require.NoError(t, err)
require.NotZero(t, k.ListCallCount())
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
return k, sw
}
func TestSetComponentsStatus_WhenRulerEnabled_SetPodStatusMap(t *testing.T) {
sw := &k8sfakes.FakeStatusWriter{}
k := &k8sfakes.FakeClient{}
k.StatusStub = func() client.StatusWriter { return sw }
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Spec: lokiv1.LokiStackSpec{
Rules: &lokiv1.RulesSpec{
Enabled: true,
func TestGenerateComponentStatus(t *testing.T) {
tt := []struct {
desc string
componentPods map[string]*corev1.PodList
wantComponentStatus *lokiv1.LokiStackComponentStatus
}{
{
desc: "no pods",
componentPods: map[string]*corev1.PodList{
manifests.LabelCompactorComponent: {},
manifests.LabelDistributorComponent: {},
manifests.LabelIngesterComponent: {},
manifests.LabelQuerierComponent: {},
manifests.LabelQueryFrontendComponent: {},
manifests.LabelIndexGatewayComponent: {},
manifests.LabelRulerComponent: {},
manifests.LabelGatewayComponent: {},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
if r.Name == name.Name && r.Namespace == name.Namespace {
k.SetClientObject(object, &s)
return nil
}
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
k.ListStub = func(_ context.Context, l client.ObjectList, _ ...client.ListOption) error {
pods := v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-a",
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-b",
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
},
wantComponentStatus: &lokiv1.LokiStackComponentStatus{
Compactor: map[corev1.PodPhase][]string{},
Distributor: map[corev1.PodPhase][]string{},
IndexGateway: map[corev1.PodPhase][]string{},
Ingester: map[corev1.PodPhase][]string{},
Querier: map[corev1.PodPhase][]string{},
QueryFrontend: map[corev1.PodPhase][]string{},
Gateway: map[corev1.PodPhase][]string{},
Ruler: map[corev1.PodPhase][]string{},
},
}
k.SetClientObjectList(l, &pods)
return nil
}
expected := lokiv1.PodStatusMap{
"Pending": []string{"pod-a"},
"Running": []string{"pod-b"},
}
sw.UpdateStub = func(_ context.Context, obj client.Object, _ ...client.UpdateOption) error {
stack := obj.(*lokiv1.LokiStack)
require.Equal(t, expected, stack.Status.Components.Ruler)
return nil
}
err := status.SetComponentsStatus(context.TODO(), k, r)
require.NoError(t, err)
require.NotZero(t, k.ListCallCount())
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestSetComponentsStatus_WhenRulerNotEnabled_DoNothing(t *testing.T) {
sw := &k8sfakes.FakeStatusWriter{}
k := &k8sfakes.FakeClient{}
k.StatusStub = func() client.StatusWriter { return sw }
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Spec: lokiv1.LokiStackSpec{
Rules: &lokiv1.RulesSpec{
Enabled: false,
{
desc: "all one pod running",
componentPods: map[string]*corev1.PodList{
manifests.LabelCompactorComponent: createPodList(manifests.LabelCompactorComponent, corev1.PodRunning),
manifests.LabelDistributorComponent: createPodList(manifests.LabelDistributorComponent, corev1.PodRunning),
manifests.LabelIngesterComponent: createPodList(manifests.LabelIngesterComponent, corev1.PodRunning),
manifests.LabelQuerierComponent: createPodList(manifests.LabelQuerierComponent, corev1.PodRunning),
manifests.LabelQueryFrontendComponent: createPodList(manifests.LabelQueryFrontendComponent, corev1.PodRunning),
manifests.LabelIndexGatewayComponent: createPodList(manifests.LabelIndexGatewayComponent, corev1.PodRunning),
manifests.LabelRulerComponent: createPodList(manifests.LabelRulerComponent, corev1.PodRunning),
manifests.LabelGatewayComponent: createPodList(manifests.LabelGatewayComponent, corev1.PodRunning),
},
wantComponentStatus: &lokiv1.LokiStackComponentStatus{
Compactor: map[corev1.PodPhase][]string{corev1.PodRunning: {"compactor-pod-0"}},
Distributor: map[corev1.PodPhase][]string{corev1.PodRunning: {"distributor-pod-0"}},
IndexGateway: map[corev1.PodPhase][]string{corev1.PodRunning: {"index-gateway-pod-0"}},
Ingester: map[corev1.PodPhase][]string{corev1.PodRunning: {"ingester-pod-0"}},
Querier: map[corev1.PodPhase][]string{corev1.PodRunning: {"querier-pod-0"}},
QueryFrontend: map[corev1.PodPhase][]string{corev1.PodRunning: {"query-frontend-pod-0"}},
Gateway: map[corev1.PodPhase][]string{corev1.PodRunning: {"lokistack-gateway-pod-0"}},
Ruler: map[corev1.PodPhase][]string{corev1.PodRunning: {"ruler-pod-0"}},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
if r.Name == name.Name && r.Namespace == name.Namespace {
k.SetClientObject(object, &s)
return nil
}
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
for _, tc := range tt {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
k.ListStub = func(_ context.Context, l client.ObjectList, o ...client.ListOption) error {
s := o[0].(client.MatchingLabels)
stack := &lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
}
c, ok := s["app.kubernetes.io/component"]
if !ok || c == "ruler" {
return nil
}
k, _ := setupListClient(t, stack, tc.componentPods)
pods := v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-a",
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-b",
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
},
},
}
k.SetClientObjectList(l, &pods)
return nil
}
componentStatus, err := generateComponentStatus(context.Background(), k, stack)
require.NoError(t, err)
require.Equal(t, tc.wantComponentStatus, componentStatus)
sw.UpdateStub = func(_ context.Context, obj client.Object, _ ...client.UpdateOption) error {
stack := obj.(*lokiv1.LokiStack)
require.Equal(t, stack.Status.Components.Ruler, lokiv1.PodStatusMap{})
return nil
// one list call for each component
require.Equal(t, 8, k.ListCallCount())
})
}
err := status.SetComponentsStatus(context.TODO(), k, r)
require.NoError(t, err)
require.NotZero(t, k.ListCallCount())
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}

@ -7,6 +7,7 @@ import (
"github.com/ViaQ/logerr/v2/kverrors"
lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/external/k8s"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -20,51 +21,33 @@ const (
messagePending = "Some LokiStack components pending on dependencies"
)
// DegradedError contains information about why the managed LokiStack has an invalid configuration.
type DegradedError struct {
Message string
Reason lokiv1.LokiStackConditionReason
Requeue bool
}
func (e *DegradedError) Error() string {
return fmt.Sprintf("cluster degraded: %s", e.Message)
}
// SetReadyCondition updates or appends the condition Ready to the lokistack status conditions.
// In addition it resets all other Status conditions to false.
func SetReadyCondition(ctx context.Context, k k8s.Client, req ctrl.Request) error {
ready := metav1.Condition{
Type: string(lokiv1.ConditionReady),
Message: messageReady,
Reason: string(lokiv1.ReasonReadyComponents),
}
return updateCondition(ctx, k, req, ready)
}
// SetFailedCondition updates or appends the condition Failed to the lokistack status conditions.
// In addition it resets all other Status conditions to false.
func SetFailedCondition(ctx context.Context, k k8s.Client, req ctrl.Request) error {
failed := metav1.Condition{
var (
conditionFailed = metav1.Condition{
Type: string(lokiv1.ConditionFailed),
Message: messageFailed,
Reason: string(lokiv1.ReasonFailedComponents),
}
return updateCondition(ctx, k, req, failed)
}
// SetPendingCondition updates or appends the condition Pending to the lokistack status conditions.
// In addition it resets all other Status conditions to false.
func SetPendingCondition(ctx context.Context, k k8s.Client, req ctrl.Request) error {
pending := metav1.Condition{
conditionPending = metav1.Condition{
Type: string(lokiv1.ConditionPending),
Message: messagePending,
Reason: string(lokiv1.ReasonPendingComponents),
}
conditionReady = metav1.Condition{
Type: string(lokiv1.ConditionReady),
Message: messageReady,
Reason: string(lokiv1.ReasonReadyComponents),
}
)
// DegradedError contains information about why the managed LokiStack has an invalid configuration.
type DegradedError struct {
Message string
Reason lokiv1.LokiStackConditionReason
Requeue bool
}
return updateCondition(ctx, k, req, pending)
func (e *DegradedError) Error() string {
return fmt.Sprintf("cluster degraded: %s", e.Message)
}
// SetDegradedCondition appends the condition Degraded to the lokistack status conditions.
@ -78,6 +61,38 @@ func SetDegradedCondition(ctx context.Context, k k8s.Client, req ctrl.Request, m
return updateCondition(ctx, k, req, degraded)
}
func generateCondition(cs *lokiv1.LokiStackComponentStatus) metav1.Condition {
// Check for failed pods first
failed := len(cs.Compactor[corev1.PodFailed]) +
len(cs.Distributor[corev1.PodFailed]) +
len(cs.Ingester[corev1.PodFailed]) +
len(cs.Querier[corev1.PodFailed]) +
len(cs.QueryFrontend[corev1.PodFailed]) +
len(cs.Gateway[corev1.PodFailed]) +
len(cs.IndexGateway[corev1.PodFailed]) +
len(cs.Ruler[corev1.PodFailed])
if failed != 0 {
return conditionFailed
}
// Check for pending pods
pending := len(cs.Compactor[corev1.PodPending]) +
len(cs.Distributor[corev1.PodPending]) +
len(cs.Ingester[corev1.PodPending]) +
len(cs.Querier[corev1.PodPending]) +
len(cs.QueryFrontend[corev1.PodPending]) +
len(cs.Gateway[corev1.PodPending]) +
len(cs.IndexGateway[corev1.PodPending]) +
len(cs.Ruler[corev1.PodPending])
if pending != 0 {
return conditionPending
}
return conditionReady
}
func updateCondition(ctx context.Context, k k8s.Client, req ctrl.Request, condition metav1.Condition) error {
var stack lokiv1.LokiStack
if err := k.Get(ctx, req.NamespacedName, &stack); err != nil {

@ -6,9 +6,8 @@ import (
lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/external/k8s/k8sfakes"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -39,391 +38,6 @@ func setupFakesNoError(t *testing.T, stack *lokiv1.LokiStack) (*k8sfakes.FakeCli
return k, sw
}
func TestSetReadyCondition_WhenGetLokiStackReturnsError_ReturnError(t *testing.T) {
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k := &k8sfakes.FakeClient{}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewBadRequest("something wasn't found")
}
err := SetReadyCondition(context.Background(), k, r)
require.Error(t, err)
}
func TestSetReadyCondition_WhenGetLokiStackReturnsNotFound_DoNothing(t *testing.T) {
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k := &k8sfakes.FakeClient{}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
err := SetReadyCondition(context.Background(), k, r)
require.NoError(t, err)
}
func TestSetReadyCondition_WhenExisting_DoNothing(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Status: lokiv1.LokiStackStatus{
Conditions: []metav1.Condition{
{
Type: string(lokiv1.ConditionReady),
Message: messageReady,
Reason: string(lokiv1.ReasonReadyComponents),
Status: metav1.ConditionTrue,
},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, _ := setupFakesNoError(t, &s)
err := SetReadyCondition(context.Background(), k, r)
require.NoError(t, err)
require.Zero(t, k.StatusCallCount())
}
func TestSetReadyCondition_WhenExisting_SetReadyConditionTrue(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Status: lokiv1.LokiStackStatus{
Conditions: []metav1.Condition{
{
Type: string(lokiv1.ConditionReady),
Status: metav1.ConditionFalse,
},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, sw := setupFakesNoError(t, &s)
err := SetReadyCondition(context.Background(), k, r)
require.NoError(t, err)
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestSetReadyCondition_WhenNoneExisting_AppendReadyCondition(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, sw := setupFakesNoError(t, &s)
err := SetReadyCondition(context.Background(), k, r)
require.NoError(t, err)
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestSetFailedCondition_WhenGetLokiStackReturnsError_ReturnError(t *testing.T) {
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k := &k8sfakes.FakeClient{}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewBadRequest("something wasn't found")
}
err := SetFailedCondition(context.Background(), k, r)
require.Error(t, err)
}
func TestSetFailedCondition_WhenGetLokiStackReturnsNotFound_DoNothing(t *testing.T) {
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k := &k8sfakes.FakeClient{}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
err := SetFailedCondition(context.Background(), k, r)
require.NoError(t, err)
}
func TestSetFailedCondition_WhenExisting_DoNothing(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Status: lokiv1.LokiStackStatus{
Conditions: []metav1.Condition{
{
Type: string(lokiv1.ConditionFailed),
Reason: string(lokiv1.ReasonFailedComponents),
Message: messageFailed,
Status: metav1.ConditionTrue,
},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, _ := setupFakesNoError(t, &s)
err := SetFailedCondition(context.Background(), k, r)
require.NoError(t, err)
require.Zero(t, k.StatusCallCount())
}
func TestSetFailedCondition_WhenExisting_SetFailedConditionTrue(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Status: lokiv1.LokiStackStatus{
Conditions: []metav1.Condition{
{
Type: string(lokiv1.ConditionFailed),
Status: metav1.ConditionFalse,
},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, sw := setupFakesNoError(t, &s)
err := SetFailedCondition(context.Background(), k, r)
require.NoError(t, err)
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestSetFailedCondition_WhenNoneExisting_AppendFailedCondition(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, sw := setupFakesNoError(t, &s)
err := SetFailedCondition(context.Background(), k, r)
require.NoError(t, err)
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestSetDegradedCondition_WhenGetLokiStackReturnsError_ReturnError(t *testing.T) {
msg := "tell me nothing"
reason := lokiv1.ReasonMissingObjectStorageSecret
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k := &k8sfakes.FakeClient{}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewBadRequest("something wasn't found")
}
err := SetDegradedCondition(context.Background(), k, r, msg, reason)
require.Error(t, err)
}
func TestSetPendingCondition_WhenGetLokiStackReturnsError_ReturnError(t *testing.T) {
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k := &k8sfakes.FakeClient{}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewBadRequest("something wasn't found")
}
err := SetPendingCondition(context.Background(), k, r)
require.Error(t, err)
}
func TestSetPendingCondition_WhenGetLokiStackReturnsNotFound_DoNothing(t *testing.T) {
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k := &k8sfakes.FakeClient{}
k.GetStub = func(_ context.Context, name types.NamespacedName, object client.Object, _ ...client.GetOption) error {
return apierrors.NewNotFound(schema.GroupResource{}, "something wasn't found")
}
err := SetPendingCondition(context.Background(), k, r)
require.NoError(t, err)
}
func TestSetPendingCondition_WhenExisting_DoNothing(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Status: lokiv1.LokiStackStatus{
Conditions: []metav1.Condition{
{
Type: string(lokiv1.ConditionPending),
Reason: string(lokiv1.ReasonPendingComponents),
Message: messagePending,
Status: metav1.ConditionTrue,
},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, _ := setupFakesNoError(t, &s)
err := SetPendingCondition(context.Background(), k, r)
require.NoError(t, err)
require.Zero(t, k.StatusCallCount())
}
func TestSetPendingCondition_WhenExisting_SetPendingConditionTrue(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
Status: lokiv1.LokiStackStatus{
Conditions: []metav1.Condition{
{
Type: string(lokiv1.ConditionPending),
Status: metav1.ConditionFalse,
},
},
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, sw := setupFakesNoError(t, &s)
err := SetPendingCondition(context.Background(), k, r)
require.NoError(t, err)
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestSetPendingCondition_WhenNoneExisting_AppendPendingCondition(t *testing.T) {
s := lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
}
r := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
k, sw := setupFakesNoError(t, &s)
err := SetPendingCondition(context.Background(), k, r)
require.NoError(t, err)
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestSetDegradedCondition_WhenGetLokiStackReturnsNotFound_DoNothing(t *testing.T) {
msg := "tell me nothing"
reason := lokiv1.ReasonMissingObjectStorageSecret
@ -537,3 +151,49 @@ func TestSetDegradedCondition_WhenNoneExisting_AppendDegradedCondition(t *testin
require.NotZero(t, k.StatusCallCount())
require.NotZero(t, sw.UpdateCallCount())
}
func TestGenerateConditions(t *testing.T) {
tt := []struct {
desc string
componentStatus *lokiv1.LokiStackComponentStatus
wantCondition metav1.Condition
}{
{
desc: "no error",
componentStatus: &lokiv1.LokiStackComponentStatus{},
wantCondition: conditionReady,
},
{
desc: "container pending",
componentStatus: &lokiv1.LokiStackComponentStatus{
Ingester: map[corev1.PodPhase][]string{
corev1.PodPending: {
"pod-0",
},
},
},
wantCondition: conditionPending,
},
{
desc: "container failed",
componentStatus: &lokiv1.LokiStackComponentStatus{
Ingester: map[corev1.PodPhase][]string{
corev1.PodFailed: {
"pod-0",
},
},
},
wantCondition: conditionFailed,
},
}
for _, tc := range tt {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
condition := generateCondition(tc.componentStatus)
require.Equal(t, tc.wantCondition, condition)
})
}
}

@ -2,12 +2,14 @@ package status
import (
"context"
"time"
"github.com/ViaQ/logerr/v2/kverrors"
lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/external/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
)
@ -15,56 +17,66 @@ import (
// Refresh executes an aggregate update of the LokiStack Status struct, i.e.
// - It recreates the Status.Components pod status map per component.
// - It sets the appropriate Status.Condition to true that matches the pod status maps.
func Refresh(ctx context.Context, k k8s.Client, req ctrl.Request) error {
if err := SetComponentsStatus(ctx, k, req); err != nil {
return err
}
var s lokiv1.LokiStack
if err := k.Get(ctx, req.NamespacedName, &s); err != nil {
func Refresh(ctx context.Context, k k8s.Client, req ctrl.Request, now time.Time) error {
var stack lokiv1.LokiStack
if err := k.Get(ctx, req.NamespacedName, &stack); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return kverrors.Wrap(err, "failed to lookup lokistack", "name", req.NamespacedName)
}
cs := s.Status.Components
cs, err := generateComponentStatus(ctx, k, &stack)
if err != nil {
return err
}
condition := generateCondition(cs)
// Check for failed pods first
failed := len(cs.Compactor[corev1.PodFailed]) +
len(cs.Distributor[corev1.PodFailed]) +
len(cs.Ingester[corev1.PodFailed]) +
len(cs.Querier[corev1.PodFailed]) +
len(cs.QueryFrontend[corev1.PodFailed]) +
len(cs.Gateway[corev1.PodFailed]) +
len(cs.IndexGateway[corev1.PodFailed]) +
len(cs.Ruler[corev1.PodFailed])
condition.LastTransitionTime = metav1.NewTime(now)
condition.Status = metav1.ConditionTrue
unknown := len(cs.Compactor[corev1.PodUnknown]) +
len(cs.Distributor[corev1.PodUnknown]) +
len(cs.Ingester[corev1.PodUnknown]) +
len(cs.Querier[corev1.PodUnknown]) +
len(cs.QueryFrontend[corev1.PodUnknown]) +
len(cs.Gateway[corev1.PodUnknown]) +
len(cs.IndexGateway[corev1.PodUnknown]) +
len(cs.Ruler[corev1.PodUnknown])
statusUpdater := func(stack *lokiv1.LokiStack) {
stack.Status.Components = *cs
if failed != 0 || unknown != 0 {
return SetFailedCondition(ctx, k, req)
}
index := -1
for i := range stack.Status.Conditions {
// Reset all other conditions first
stack.Status.Conditions[i].Status = metav1.ConditionFalse
stack.Status.Conditions[i].LastTransitionTime = metav1.NewTime(now)
// Check for pending pods
pending := len(cs.Compactor[corev1.PodPending]) +
len(cs.Distributor[corev1.PodPending]) +
len(cs.Ingester[corev1.PodPending]) +
len(cs.Querier[corev1.PodPending]) +
len(cs.QueryFrontend[corev1.PodPending]) +
len(cs.Gateway[corev1.PodPending]) +
len(cs.IndexGateway[corev1.PodPending]) +
len(cs.Ruler[corev1.PodPending])
// Locate existing pending condition if any
if stack.Status.Conditions[i].Type == condition.Type {
index = i
}
}
if pending != 0 {
return SetPendingCondition(ctx, k, req)
if index == -1 {
stack.Status.Conditions = append(stack.Status.Conditions, condition)
} else {
stack.Status.Conditions[index] = condition
}
}
statusUpdater(&stack)
err = k.Status().Update(ctx, &stack)
switch {
case err == nil:
return nil
case apierrors.IsConflict(err):
// break into retry-logic below on conflict
break
case err != nil:
// return non-conflict errors
return err
}
return SetReadyCondition(ctx, k, req)
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := k.Get(ctx, req.NamespacedName, &stack); err != nil {
return err
}
statusUpdater(&stack)
return k.Status().Update(ctx, &stack)
})
}

@ -0,0 +1,83 @@
package status
import (
"context"
"testing"
"time"
lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/manifests"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
)
func TestRefreshSuccess(t *testing.T) {
now := time.Now()
stack := &lokiv1.LokiStack{
ObjectMeta: metav1.ObjectMeta{
Name: "my-stack",
Namespace: "some-ns",
},
}
req := ctrl.Request{
NamespacedName: types.NamespacedName{
Name: "my-stack",
Namespace: "some-ns",
},
}
componentPods := map[string]*corev1.PodList{
manifests.LabelCompactorComponent: createPodList(manifests.LabelCompactorComponent, corev1.PodRunning),
manifests.LabelDistributorComponent: createPodList(manifests.LabelDistributorComponent, corev1.PodRunning),
manifests.LabelIngesterComponent: createPodList(manifests.LabelIngesterComponent, corev1.PodRunning),
manifests.LabelQuerierComponent: createPodList(manifests.LabelQuerierComponent, corev1.PodRunning),
manifests.LabelQueryFrontendComponent: createPodList(manifests.LabelQueryFrontendComponent, corev1.PodRunning),
manifests.LabelIndexGatewayComponent: createPodList(manifests.LabelIndexGatewayComponent, corev1.PodRunning),
manifests.LabelRulerComponent: createPodList(manifests.LabelRulerComponent, corev1.PodRunning),
manifests.LabelGatewayComponent: createPodList(manifests.LabelGatewayComponent, corev1.PodRunning),
}
wantStatus := lokiv1.LokiStackStatus{
Components: lokiv1.LokiStackComponentStatus{
Compactor: map[corev1.PodPhase][]string{corev1.PodRunning: {"compactor-pod-0"}},
Distributor: map[corev1.PodPhase][]string{corev1.PodRunning: {"distributor-pod-0"}},
IndexGateway: map[corev1.PodPhase][]string{corev1.PodRunning: {"index-gateway-pod-0"}},
Ingester: map[corev1.PodPhase][]string{corev1.PodRunning: {"ingester-pod-0"}},
Querier: map[corev1.PodPhase][]string{corev1.PodRunning: {"querier-pod-0"}},
QueryFrontend: map[corev1.PodPhase][]string{corev1.PodRunning: {"query-frontend-pod-0"}},
Gateway: map[corev1.PodPhase][]string{corev1.PodRunning: {"lokistack-gateway-pod-0"}},
Ruler: map[corev1.PodPhase][]string{corev1.PodRunning: {"ruler-pod-0"}},
},
Storage: lokiv1.LokiStackStorageStatus{},
Conditions: []metav1.Condition{
{
Type: string(lokiv1.ConditionReady),
Reason: string(lokiv1.ReasonReadyComponents),
Message: messageReady,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now),
},
},
}
k, sw := setupListClient(t, stack, componentPods)
err := Refresh(context.Background(), k, req, now)
require.NoError(t, err)
require.Equal(t, 1, k.GetCallCount())
require.Equal(t, 8, k.ListCallCount())
require.Equal(t, 1, sw.UpdateCallCount())
_, updated, _ := sw.UpdateArgsForCall(0)
updatedStack, ok := updated.(*lokiv1.LokiStack)
if !ok {
t.Fatalf("not a LokiStack: %T", updatedStack)
}
require.Equal(t, wantStatus, updatedStack.Status)
}
Loading…
Cancel
Save