From 8e528f6f038f787b65717037966ef84d4be6e3c4 Mon Sep 17 00:00:00 2001 From: Jan Sykora Date: Thu, 25 Apr 2024 21:06:44 +0200 Subject: [PATCH] feat: Use multi namespace informers for ConfigMaps --- internal/config/config.go | 1 + internal/services/controller/controller.go | 70 ++++++++++------ .../services/controller/controller_test.go | 79 ++++++++++++------- internal/services/controller/worker.go | 11 +-- 4 files changed, 103 insertions(+), 58 deletions(-) diff --git a/internal/config/config.go b/internal/config/config.go index 5f299f4..92c9e02 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -113,6 +113,7 @@ type Controller struct { InitialSleepDuration time.Duration `mapstructure:"initial_sleep_duration"` HealthySnapshotIntervalLimit time.Duration `mapstructure:"healthy_snapshot_interval_limit"` InitializationTimeoutExtension time.Duration `mapstructure:"initialization_timeout_extension"` + ConfigMapNamespaces []string `mapstructure:"config_map_namespaces"` } type LeaderElectionConfig struct { diff --git a/internal/services/controller/controller.go b/internal/services/controller/controller.go index 50a87a8..be21113 100644 --- a/internal/services/controller/controller.go +++ b/internal/services/controller/controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" v1 "k8s.io/client-go/informers/core/v1" @@ -79,6 +80,8 @@ type Controller struct { } type conditionalInformer struct { + // if empty it means all namespaces + namespace string resource schema.GroupVersionResource apiType reflect.Type informerFactory func() cache.SharedIndexInformer @@ -86,11 +89,17 @@ type conditionalInformer struct { isApplied bool } +func (i *conditionalInformer) Name() string { + if i.namespace != "" { + return fmt.Sprintf("Namespace:%s %s", i.namespace, i.resource.String()) + } + return i.resource.String() +} + func New( log logrus.FieldLogger, - f informers.SharedInformerFactory, - df dynamicinformer.DynamicSharedInformerFactory, - discovery discovery.DiscoveryInterface, + clientset kubernetes.Interface, + dynamicClient dynamic.Interface, castaiclient castai.Client, metricsClient versioned.Interface, provider types.Provider, @@ -105,8 +114,12 @@ func New( queue := workqueue.NewNamed("castai-agent") + f := informers.NewSharedInformerFactory(clientset, 0) + df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) + discovery := clientset.Discovery() + defaultInformers := getDefaultInformers(f) - conditionalInformers := getConditionalInformers(f, df, metricsClient, log) + conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log) handledInformers := map[string]*custominformers.HandledInformer{} for typ, i := range defaultInformers { @@ -261,19 +274,19 @@ func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, c apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(informer.resource.GroupVersion().String(), apiResourceLists) if !isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) { c.log.Warnf("Skipping conditional informer name: %v, because API resource is not available", - informer.resource.String(), + informer.Name(), ) continue } if !c.informerHasAccess(ctx, informer) { c.log.Warnf("Skipping conditional informer name: %v, because required access is not available", - informer.resource.String(), + informer.Name(), ) continue } - c.log.Infof("Starting conditional informer for %v", informer.resource.String()) + c.log.Infof("Starting conditional informer for %v", informer.Name()) tryConditionalInformers[i].isApplied = true handledInformer := custominformers.NewHandledInformer(c.log, c.queue, informer.informerFactory(), informer.apiType, nil) @@ -423,12 +436,12 @@ func (c *Controller) debugQueueContent(maxItems int) string { func (c *Controller) informerHasAccess(ctx context.Context, informer conditionalInformer) bool { // Check if allowed to access all resources with the wildcard "*" verb - if access := c.informerIsAllowedToAccessResource(ctx, "*", informer, informer.resource.Group); access.Status.Allowed { + if access := c.informerIsAllowedToAccessResource(ctx, informer.namespace, "*", informer, informer.resource.Group); access.Status.Allowed { return true } for _, verb := range informer.permissionVerbs { - access := c.informerIsAllowedToAccessResource(ctx, verb, informer, informer.resource.Group) + access := c.informerIsAllowedToAccessResource(ctx, informer.namespace, verb, informer, informer.resource.Group) if !access.Status.Allowed { return false } @@ -436,13 +449,14 @@ func (c *Controller) informerHasAccess(ctx context.Context, informer conditional return true } -func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, verb string, informer conditionalInformer, groupName string) *authorizationv1.SelfSubjectAccessReview { +func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, namespace, verb string, informer conditionalInformer, groupName string) *authorizationv1.SelfSubjectAccessReview { access, err := c.selfSubjectAccessReview.Create(ctx, &authorizationv1.SelfSubjectAccessReview{ Spec: authorizationv1.SelfSubjectAccessReviewSpec{ ResourceAttributes: &authorizationv1.ResourceAttributes{ - Verb: verb, - Group: groupName, - Resource: informer.resource.Resource, + Namespace: namespace, + Verb: verb, + Group: groupName, + Resource: informer.resource.Resource, }, }, }, metav1.CreateOptions{}) @@ -454,16 +468,12 @@ func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, verb return access } -func getConditionalInformers(f informers.SharedInformerFactory, df dynamicinformer.DynamicSharedInformerFactory, metricsClient versioned.Interface, logger logrus.FieldLogger) []conditionalInformer { - return []conditionalInformer{ - { - resource: corev1.SchemeGroupVersion.WithResource("configmaps"), - apiType: reflect.TypeOf(&corev1.ConfigMap{}), - permissionVerbs: []string{"get", "list", "watch"}, - informerFactory: func() cache.SharedIndexInformer { - return f.Core().V1().ConfigMaps().Informer() - }, - }, +func (c *Controller) Start(done <-chan struct{}) { + c.informerFactory.Start(done) +} + +func getConditionalInformers(clientset kubernetes.Interface, cfg *config.Controller, f informers.SharedInformerFactory, df dynamicinformer.DynamicSharedInformerFactory, metricsClient versioned.Interface, logger logrus.FieldLogger) []conditionalInformer { + conditionalInformers := []conditionalInformer{ { resource: policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"), apiType: reflect.TypeOf(&policyv1.PodDisruptionBudget{}), @@ -561,6 +571,20 @@ func getConditionalInformers(f informers.SharedInformerFactory, df dynamicinform }, }, } + + for _, cmNamespace := range cfg.ConfigMapNamespaces { + conditionalInformers = append(conditionalInformers, conditionalInformer{ + namespace: cmNamespace, + resource: corev1.SchemeGroupVersion.WithResource("configmaps"), + apiType: reflect.TypeOf(&corev1.ConfigMap{}), + permissionVerbs: []string{"get", "list", "watch"}, + informerFactory: func() cache.SharedIndexInformer { + namespaceScopedInformer := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(cmNamespace)) + return namespaceScopedInformer.Core().V1().ConfigMaps().Informer() + }, + }) + } + return conditionalInformers } type defaultInformer struct { diff --git a/internal/services/controller/controller_test.go b/internal/services/controller/controller_test.go index 6db287f..219d3e2 100644 --- a/internal/services/controller/controller_test.go +++ b/internal/services/controller/controller_test.go @@ -30,9 +30,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" fakediscovery "k8s.io/client-go/discovery/fake" - "k8s.io/client-go/dynamic/dynamicinformer" dynamic_fake "k8s.io/client-go/dynamic/fake" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" authfakev1 "k8s.io/client-go/kubernetes/typed/authorization/v1/fake" k8stesting "k8s.io/client-go/testing" @@ -448,22 +446,28 @@ func TestController_HappyPath(t *testing.T) { provider.EXPECT().FilterSpot(gomock.Any(), []*v1.Node{node}).Return([]*v1.Node{node}, nil) - f := informers.NewSharedInformerFactory(clientset, 0) - df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) - log := logrus.New() log.SetLevel(logrus.DebugLevel) - ctrl := New(log, f, df, clientset.Discovery(), castaiclient, metricsClient, provider, clusterID.String(), &config.Controller{ - Interval: 15 * time.Second, - PrepTimeout: 2 * time.Second, - InitialSleepDuration: 10 * time.Millisecond, - }, + ctrl := New( + log, + clientset, + dynamicClient, + castaiclient, + metricsClient, + provider, + clusterID.String(), + &config.Controller{ + Interval: 15 * time.Second, + PrepTimeout: 2 * time.Second, + InitialSleepDuration: 10 * time.Millisecond, + ConfigMapNamespaces: []string{v1.NamespaceDefault}, + }, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), fakeSelfSubjectAccessReviewsClient, ) - f.Start(ctx.Done()) + ctrl.Start(ctx.Done()) go func() { require.NoError(t, ctrl.Run(ctx)) @@ -498,16 +502,26 @@ func TestNew(t *testing.T) { clusterID := uuid.New() agentVersion := &config.AgentVersion{Version: "1.2.3"} - f := informers.NewSharedInformerFactory(clientset, 0) - df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) - log := logrus.New() log.SetLevel(logrus.DebugLevel) - ctrl := New(log, f, df, clientset.Discovery(), castaiclient, metricsClient, provider, clusterID.String(), &config.Controller{ - Interval: 15 * time.Second, - PrepTimeout: 2 * time.Second, - InitialSleepDuration: 10 * time.Millisecond, - }, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), clientset.AuthorizationV1().SelfSubjectAccessReviews()) + ctrl := New( + log, + clientset, + dynamicClient, + castaiclient, + metricsClient, + provider, + clusterID.String(), + &config.Controller{ + Interval: 15 * time.Second, + PrepTimeout: 2 * time.Second, + InitialSleepDuration: 10 * time.Millisecond, + }, + version, + agentVersion, + NewHealthzProvider(defaultHealthzCfg, log), + clientset.AuthorizationV1().SelfSubjectAccessReviews(), + ) r.NotNil(ctrl) @@ -532,8 +546,6 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) { clientset := fake.NewSimpleClientset() metricsClient := metrics_fake.NewSimpleClientset() dynamicClient := dynamic_fake.NewSimpleDynamicClient(runtime.NewScheme()) - f := informers.NewSharedInformerFactory(clientset, 0) - df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) version.EXPECT().Full().Return("1.21+").MaxTimes(3) @@ -612,13 +624,26 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) { }) log.SetLevel(logrus.DebugLevel) - ctrl := New(log, f, df, clientset.Discovery(), castaiclient, metricsClient, provider, clusterID.String(), &config.Controller{ - Interval: 2 * time.Second, - PrepTimeout: 2 * time.Second, - InitialSleepDuration: 10 * time.Millisecond, - }, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), clientset.AuthorizationV1().SelfSubjectAccessReviews()) + ctrl := New( + log, + clientset, + dynamicClient, + castaiclient, + metricsClient, + provider, + clusterID.String(), + &config.Controller{ + Interval: 2 * time.Second, + PrepTimeout: 2 * time.Second, + InitialSleepDuration: 10 * time.Millisecond, + }, + version, + agentVersion, + NewHealthzProvider(defaultHealthzCfg, log), + clientset.AuthorizationV1().SelfSubjectAccessReviews(), + ) - f.Start(ctx.Done()) + ctrl.Start(ctx.Done()) go func() { require.NoError(t, ctrl.Run(ctx)) diff --git a/internal/services/controller/worker.go b/internal/services/controller/worker.go index d0455cc..e835962 100644 --- a/internal/services/controller/worker.go +++ b/internal/services/controller/worker.go @@ -7,8 +7,6 @@ import ( "github.com/google/uuid" "github.com/sirupsen/logrus" "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/metrics/pkg/client/clientset/versioned" @@ -49,13 +47,10 @@ func Loop( log = log.WithField("k8s_version", v.Full()) - f := informers.NewSharedInformerFactory(clientset, 0) - df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) ctrl := New( log, - f, - df, - clientset.Discovery(), + clientset, + dynamicClient, castaiclient, metricsClient, provider, @@ -67,7 +62,7 @@ func Loop( clientset.AuthorizationV1().SelfSubjectAccessReviews(), ) - f.Start(ctrlCtx.Done()) + ctrl.Start(ctrlCtx.Done()) // Loop the controller. This is a blocking call. return ctrl.Run(ctrlCtx)