operator: Fix LokiStackController watches for cluster-scoped resources (#7710)

pull/7716/head^2
Periklis Tsirakidis 3 years ago committed by GitHub
parent f6dabc81c0
commit 57ea330bc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      operator/CHANGELOG.md
  2. 55
      operator/controllers/loki/lokistack_controller.go
  3. 61
      operator/controllers/loki/lokistack_controller_test.go
  4. 7
      operator/internal/external/k8s/builder.go
  5. 80
      operator/internal/external/k8s/k8sfakes/fake_builder.go

@ -1,5 +1,6 @@
## Main
- [7710](https://github.com/grafana/loki/pull/7710) **periklis**: Fix LokiStackController watches for cluster-scoped resources
- [7682](https://github.com/grafana/loki/pull/7682) **periklis**: Refactor cluster proxy to use configv1.Proxy on OpenShift
- [7711](https://github.com/grafana/loki/pull/7711) **Red-GV**: Remove default value from replicationFactor field
- [7617](https://github.com/grafana/loki/pull/7617) **Red-GV**: Modify ingestionRate for respective shirt size

@ -5,17 +5,20 @@ import (
"errors"
"time"
"github.com/google/go-cmp/cmp"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
"github.com/grafana/loki/operator/controllers/loki/internal/management/state"
"github.com/grafana/loki/operator/internal/external/k8s"
"github.com/grafana/loki/operator/internal/handlers"
"github.com/grafana/loki/operator/internal/status"
openshiftconfigv1 "github.com/openshift/api/config/v1"
routev1 "github.com/openshift/api/route/v1"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
openshiftconfigv1 "github.com/openshift/api/config/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
@ -50,11 +53,21 @@ var (
updateOrDeleteOnlyPred = builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
switch e.ObjectOld.(type) {
case *appsv1.Deployment:
case *appsv1.StatefulSet:
case *openshiftconfigv1.Proxy:
// Update on any change of the RevisionVersion to capture
// status updates for the proxy object. On OpenShift the
// proxy status indicates that values for httpProxy/httpsProxy/noProxy
// are valid after considering the readiness probes to access
// the public net through these proxies.
return true
default:
// Update only if generation change, filter out anything else.
// We only need to check generation change here, because it is only
// updated on spec changes. On the other hand RevisionVersion
// changes also on status changes. We want to omit reconciliation
// for status updates for now.
return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
}
return false
},
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool {
@ -191,12 +204,36 @@ func (r *LokiStackReconciler) buildController(bld k8s.Builder) error {
}
if r.FeatureGates.OpenShift.ClusterTLSPolicy {
bld = bld.Owns(&openshiftconfigv1.APIServer{}, updateOrDeleteOnlyPred)
bld = bld.Watches(&source.Kind{Type: &openshiftconfigv1.APIServer{}}, r.enqueueAllLokiStacksHandler(), updateOrDeleteOnlyPred)
}
if r.FeatureGates.OpenShift.ClusterProxy {
bld = bld.Owns(&openshiftconfigv1.Proxy{}, updateOrDeleteOnlyPred)
bld = bld.Watches(&source.Kind{Type: &openshiftconfigv1.Proxy{}}, r.enqueueAllLokiStacksHandler(), updateOrDeleteOnlyPred)
}
return bld.Complete(r)
}
func (r *LokiStackReconciler) enqueueAllLokiStacksHandler() handler.EventHandler {
ctx := context.TODO()
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
lokiStacks := &lokiv1.LokiStackList{}
if err := r.Client.List(ctx, lokiStacks); err != nil {
r.Log.Error(err, "Error getting LokiStack resources in event handler")
return nil
}
var requests []reconcile.Request
for _, stack := range lokiStacks.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: stack.Namespace,
Name: stack.Name,
},
})
}
r.Log.Info("Enqueued requests for all LokiStacks because of global resource change", "count", len(requests), "kind", obj.GetObjectKind())
return requests
})
}

@ -6,12 +6,11 @@ import (
"os"
"testing"
"github.com/ViaQ/logerr/v2/log"
"github.com/go-logr/logr"
configv1 "github.com/grafana/loki/operator/apis/config/v1"
lokiv1 "github.com/grafana/loki/operator/apis/loki/v1"
"github.com/grafana/loki/operator/internal/external/k8s/k8sfakes"
"github.com/ViaQ/logerr/v2/log"
"github.com/go-logr/logr"
openshiftconfigv1 "github.com/openshift/api/config/v1"
routev1 "github.com/openshift/api/route/v1"
"github.com/stretchr/testify/require"
@ -24,6 +23,7 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"
)
var (
@ -167,10 +167,43 @@ func TestLokiStackController_RegisterOwnedResourcesForUpdateOrDeleteOnly(t *test
},
pred: updateOrDeleteOnlyPred,
},
}
for _, tst := range table {
b := &k8sfakes.FakeBuilder{}
b.ForReturns(b)
b.OwnsReturns(b)
b.WatchesReturns(b)
c := &LokiStackReconciler{Client: k, Scheme: scheme, FeatureGates: tst.featureGates}
err := c.buildController(b)
require.NoError(t, err)
// Require Owns-Calls for all owned resources
require.Equal(t, tst.ownCallsCount, b.OwnsCallCount())
// Require Owns-call options to have delete predicate only
obj, opts := b.OwnsArgsForCall(tst.index)
require.Equal(t, tst.obj, obj)
require.Equal(t, tst.pred, opts[0])
}
}
func TestLokiStackController_RegisterWatchedResources(t *testing.T) {
k := &k8sfakes.FakeClient{}
// Require owned resources
type test struct {
index int
watchesCallsCount int
featureGates configv1.FeatureGates
src source.Source
pred builder.OwnsOption
}
table := []test{
{
obj: &openshiftconfigv1.APIServer{},
index: 11,
ownCallsCount: 12,
src: &source.Kind{Type: &openshiftconfigv1.APIServer{}},
index: 0,
watchesCallsCount: 1,
featureGates: configv1.FeatureGates{
OpenShift: configv1.OpenShiftFeatureGates{
ClusterTLSPolicy: true,
@ -179,9 +212,9 @@ func TestLokiStackController_RegisterOwnedResourcesForUpdateOrDeleteOnly(t *test
pred: updateOrDeleteOnlyPred,
},
{
obj: &openshiftconfigv1.Proxy{},
index: 11,
ownCallsCount: 12,
src: &source.Kind{Type: &openshiftconfigv1.Proxy{}},
index: 0,
watchesCallsCount: 1,
featureGates: configv1.FeatureGates{
OpenShift: configv1.OpenShiftFeatureGates{
ClusterProxy: true,
@ -194,17 +227,17 @@ func TestLokiStackController_RegisterOwnedResourcesForUpdateOrDeleteOnly(t *test
b := &k8sfakes.FakeBuilder{}
b.ForReturns(b)
b.OwnsReturns(b)
b.WatchesReturns(b)
c := &LokiStackReconciler{Client: k, Scheme: scheme, FeatureGates: tst.featureGates}
err := c.buildController(b)
require.NoError(t, err)
// Require Owns-Calls for all owned resources
require.Equal(t, tst.ownCallsCount, b.OwnsCallCount())
// Require Watches-calls for all watches resources
require.Equal(t, tst.watchesCallsCount, b.WatchesCallCount())
// Require Owns-call options to have delete predicate only
obj, opts := b.OwnsArgsForCall(tst.index)
require.Equal(t, tst.obj, obj)
src, _, opts := b.WatchesArgsForCall(tst.index)
require.Equal(t, tst.src, src)
require.Equal(t, tst.pred, opts[0])
}
}

@ -5,8 +5,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// Builder is a controller-runtime interface used internally. It copies function from
@ -16,6 +18,7 @@ import (
type Builder interface {
For(object client.Object, opts ...builder.ForOption) Builder
Owns(object client.Object, opts ...builder.OwnsOption) Builder
Watches(src source.Source, handler handler.EventHandler, opts ...builder.WatchesOption) Builder
WithEventFilter(p predicate.Predicate) Builder
WithOptions(options controller.Options) Builder
WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) Builder
@ -42,6 +45,10 @@ func (b *ctrlBuilder) Owns(object client.Object, opts ...builder.OwnsOption) Bui
return &ctrlBuilder{bld: b.bld.Owns(object, opts...)}
}
func (b *ctrlBuilder) Watches(src source.Source, handler handler.EventHandler, opts ...builder.WatchesOption) Builder {
return &ctrlBuilder{bld: b.bld.Watches(src, handler, opts...)}
}
func (b *ctrlBuilder) WithEventFilter(p predicate.Predicate) Builder {
return &ctrlBuilder{bld: b.bld.WithEventFilter(p)}
}

@ -9,8 +9,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
type FakeBuilder struct {
@ -73,6 +75,19 @@ type FakeBuilder struct {
ownsReturnsOnCall map[int]struct {
result1 k8s.Builder
}
WatchesStub func(source.Source, handler.EventHandler, ...builder.WatchesOption) k8s.Builder
watchesMutex sync.RWMutex
watchesArgsForCall []struct {
arg1 source.Source
arg2 handler.EventHandler
arg3 []builder.WatchesOption
}
watchesReturns struct {
result1 k8s.Builder
}
watchesReturnsOnCall map[int]struct {
result1 k8s.Builder
}
WithEventFilterStub func(predicate.Predicate) k8s.Builder
withEventFilterMutex sync.RWMutex
withEventFilterArgsForCall []struct {
@ -420,6 +435,69 @@ func (fake *FakeBuilder) OwnsReturnsOnCall(i int, result1 k8s.Builder) {
}{result1}
}
func (fake *FakeBuilder) Watches(arg1 source.Source, arg2 handler.EventHandler, arg3 ...builder.WatchesOption) k8s.Builder {
fake.watchesMutex.Lock()
ret, specificReturn := fake.watchesReturnsOnCall[len(fake.watchesArgsForCall)]
fake.watchesArgsForCall = append(fake.watchesArgsForCall, struct {
arg1 source.Source
arg2 handler.EventHandler
arg3 []builder.WatchesOption
}{arg1, arg2, arg3})
stub := fake.WatchesStub
fakeReturns := fake.watchesReturns
fake.recordInvocation("Watches", []interface{}{arg1, arg2, arg3})
fake.watchesMutex.Unlock()
if stub != nil {
return stub(arg1, arg2, arg3...)
}
if specificReturn {
return ret.result1
}
return fakeReturns.result1
}
func (fake *FakeBuilder) WatchesCallCount() int {
fake.watchesMutex.RLock()
defer fake.watchesMutex.RUnlock()
return len(fake.watchesArgsForCall)
}
func (fake *FakeBuilder) WatchesCalls(stub func(source.Source, handler.EventHandler, ...builder.WatchesOption) k8s.Builder) {
fake.watchesMutex.Lock()
defer fake.watchesMutex.Unlock()
fake.WatchesStub = stub
}
func (fake *FakeBuilder) WatchesArgsForCall(i int) (source.Source, handler.EventHandler, []builder.WatchesOption) {
fake.watchesMutex.RLock()
defer fake.watchesMutex.RUnlock()
argsForCall := fake.watchesArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3
}
func (fake *FakeBuilder) WatchesReturns(result1 k8s.Builder) {
fake.watchesMutex.Lock()
defer fake.watchesMutex.Unlock()
fake.WatchesStub = nil
fake.watchesReturns = struct {
result1 k8s.Builder
}{result1}
}
func (fake *FakeBuilder) WatchesReturnsOnCall(i int, result1 k8s.Builder) {
fake.watchesMutex.Lock()
defer fake.watchesMutex.Unlock()
fake.WatchesStub = nil
if fake.watchesReturnsOnCall == nil {
fake.watchesReturnsOnCall = make(map[int]struct {
result1 k8s.Builder
})
}
fake.watchesReturnsOnCall[i] = struct {
result1 k8s.Builder
}{result1}
}
func (fake *FakeBuilder) WithEventFilter(arg1 predicate.Predicate) k8s.Builder {
fake.withEventFilterMutex.Lock()
ret, specificReturn := fake.withEventFilterReturnsOnCall[len(fake.withEventFilterArgsForCall)]
@ -616,6 +694,8 @@ func (fake *FakeBuilder) Invocations() map[string][][]interface{} {
defer fake.namedMutex.RUnlock()
fake.ownsMutex.RLock()
defer fake.ownsMutex.RUnlock()
fake.watchesMutex.RLock()
defer fake.watchesMutex.RUnlock()
fake.withEventFilterMutex.RLock()
defer fake.withEventFilterMutex.RUnlock()
fake.withLogConstructorMutex.RLock()

Loading…
Cancel
Save