Skip to content

Commit

Permalink
Merge pull request #164 from castai/multi_ns_cm_informers
Browse files Browse the repository at this point in the history
feat: Use multi namespace informers for ConfigMaps
  • Loading branch information
jansyk13 committed Apr 26, 2024
2 parents 40fddc0 + 8e528f6 commit 02f19f2
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 58 deletions.
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type Controller struct {
InitialSleepDuration time.Duration `mapstructure:"initial_sleep_duration"`
HealthySnapshotIntervalLimit time.Duration `mapstructure:"healthy_snapshot_interval_limit"`
InitializationTimeoutExtension time.Duration `mapstructure:"initialization_timeout_extension"`
ConfigMapNamespaces []string `mapstructure:"config_map_namespaces"`
}

type LeaderElectionConfig struct {
Expand Down
70 changes: 47 additions & 23 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
v1 "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -79,18 +80,26 @@ type Controller struct {
}

type conditionalInformer struct {
// if empty it means all namespaces
namespace string
resource schema.GroupVersionResource
apiType reflect.Type
informerFactory func() cache.SharedIndexInformer
permissionVerbs []string
isApplied bool
}

func (i *conditionalInformer) Name() string {
if i.namespace != "" {
return fmt.Sprintf("Namespace:%s %s", i.namespace, i.resource.String())
}
return i.resource.String()
}

func New(
log logrus.FieldLogger,
f informers.SharedInformerFactory,
df dynamicinformer.DynamicSharedInformerFactory,
discovery discovery.DiscoveryInterface,
clientset kubernetes.Interface,
dynamicClient dynamic.Interface,
castaiclient castai.Client,
metricsClient versioned.Interface,
provider types.Provider,
Expand All @@ -105,8 +114,12 @@ func New(

queue := workqueue.NewNamed("castai-agent")

f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
discovery := clientset.Discovery()

defaultInformers := getDefaultInformers(f)
conditionalInformers := getConditionalInformers(f, df, metricsClient, log)
conditionalInformers := getConditionalInformers(clientset, cfg, f, df, metricsClient, log)

handledInformers := map[string]*custominformers.HandledInformer{}
for typ, i := range defaultInformers {
Expand Down Expand Up @@ -261,19 +274,19 @@ func (c *Controller) startConditionalInformersWithWatcher(ctx context.Context, c
apiResourceListForGroupVersion := getAPIResourceListByGroupVersion(informer.resource.GroupVersion().String(), apiResourceLists)
if !isResourceAvailable(informer.apiType, apiResourceListForGroupVersion) {
c.log.Warnf("Skipping conditional informer name: %v, because API resource is not available",
informer.resource.String(),
informer.Name(),
)
continue
}

if !c.informerHasAccess(ctx, informer) {
c.log.Warnf("Skipping conditional informer name: %v, because required access is not available",
informer.resource.String(),
informer.Name(),
)
continue
}

c.log.Infof("Starting conditional informer for %v", informer.resource.String())
c.log.Infof("Starting conditional informer for %v", informer.Name())
tryConditionalInformers[i].isApplied = true

handledInformer := custominformers.NewHandledInformer(c.log, c.queue, informer.informerFactory(), informer.apiType, nil)
Expand Down Expand Up @@ -423,26 +436,27 @@ func (c *Controller) debugQueueContent(maxItems int) string {

func (c *Controller) informerHasAccess(ctx context.Context, informer conditionalInformer) bool {
// Check if allowed to access all resources with the wildcard "*" verb
if access := c.informerIsAllowedToAccessResource(ctx, "*", informer, informer.resource.Group); access.Status.Allowed {
if access := c.informerIsAllowedToAccessResource(ctx, informer.namespace, "*", informer, informer.resource.Group); access.Status.Allowed {
return true
}

for _, verb := range informer.permissionVerbs {
access := c.informerIsAllowedToAccessResource(ctx, verb, informer, informer.resource.Group)
access := c.informerIsAllowedToAccessResource(ctx, informer.namespace, verb, informer, informer.resource.Group)
if !access.Status.Allowed {
return false
}
}
return true
}

func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, verb string, informer conditionalInformer, groupName string) *authorizationv1.SelfSubjectAccessReview {
func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, namespace, verb string, informer conditionalInformer, groupName string) *authorizationv1.SelfSubjectAccessReview {
access, err := c.selfSubjectAccessReview.Create(ctx, &authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Verb: verb,
Group: groupName,
Resource: informer.resource.Resource,
Namespace: namespace,
Verb: verb,
Group: groupName,
Resource: informer.resource.Resource,
},
},
}, metav1.CreateOptions{})
Expand All @@ -454,16 +468,12 @@ func (c *Controller) informerIsAllowedToAccessResource(ctx context.Context, verb
return access
}

func getConditionalInformers(f informers.SharedInformerFactory, df dynamicinformer.DynamicSharedInformerFactory, metricsClient versioned.Interface, logger logrus.FieldLogger) []conditionalInformer {
return []conditionalInformer{
{
resource: corev1.SchemeGroupVersion.WithResource("configmaps"),
apiType: reflect.TypeOf(&corev1.ConfigMap{}),
permissionVerbs: []string{"get", "list", "watch"},
informerFactory: func() cache.SharedIndexInformer {
return f.Core().V1().ConfigMaps().Informer()
},
},
func (c *Controller) Start(done <-chan struct{}) {
c.informerFactory.Start(done)
}

func getConditionalInformers(clientset kubernetes.Interface, cfg *config.Controller, f informers.SharedInformerFactory, df dynamicinformer.DynamicSharedInformerFactory, metricsClient versioned.Interface, logger logrus.FieldLogger) []conditionalInformer {
conditionalInformers := []conditionalInformer{
{
resource: policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets"),
apiType: reflect.TypeOf(&policyv1.PodDisruptionBudget{}),
Expand Down Expand Up @@ -561,6 +571,20 @@ func getConditionalInformers(f informers.SharedInformerFactory, df dynamicinform
},
},
}

for _, cmNamespace := range cfg.ConfigMapNamespaces {
conditionalInformers = append(conditionalInformers, conditionalInformer{
namespace: cmNamespace,
resource: corev1.SchemeGroupVersion.WithResource("configmaps"),
apiType: reflect.TypeOf(&corev1.ConfigMap{}),
permissionVerbs: []string{"get", "list", "watch"},
informerFactory: func() cache.SharedIndexInformer {
namespaceScopedInformer := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(cmNamespace))
return namespaceScopedInformer.Core().V1().ConfigMaps().Informer()
},
})
}
return conditionalInformers
}

type defaultInformer struct {
Expand Down
79 changes: 52 additions & 27 deletions internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/dynamic/dynamicinformer"
dynamic_fake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
authfakev1 "k8s.io/client-go/kubernetes/typed/authorization/v1/fake"
k8stesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -448,22 +446,28 @@ func TestController_HappyPath(t *testing.T) {

provider.EXPECT().FilterSpot(gomock.Any(), []*v1.Node{node}).Return([]*v1.Node{node}, nil)

f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)

log := logrus.New()
log.SetLevel(logrus.DebugLevel)
ctrl := New(log, f, df, clientset.Discovery(), castaiclient, metricsClient, provider, clusterID.String(), &config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
},
ctrl := New(
log,
clientset,
dynamicClient,
castaiclient,
metricsClient,
provider,
clusterID.String(),
&config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
ConfigMapNamespaces: []string{v1.NamespaceDefault},
},
version,
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
fakeSelfSubjectAccessReviewsClient,
)
f.Start(ctx.Done())
ctrl.Start(ctx.Done())

go func() {
require.NoError(t, ctrl.Run(ctx))
Expand Down Expand Up @@ -498,16 +502,26 @@ func TestNew(t *testing.T) {
clusterID := uuid.New()
agentVersion := &config.AgentVersion{Version: "1.2.3"}

f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)

log := logrus.New()
log.SetLevel(logrus.DebugLevel)
ctrl := New(log, f, df, clientset.Discovery(), castaiclient, metricsClient, provider, clusterID.String(), &config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
}, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), clientset.AuthorizationV1().SelfSubjectAccessReviews())
ctrl := New(
log,
clientset,
dynamicClient,
castaiclient,
metricsClient,
provider,
clusterID.String(),
&config.Controller{
Interval: 15 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
},
version,
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
)

r.NotNil(ctrl)

Expand All @@ -532,8 +546,6 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {
clientset := fake.NewSimpleClientset()
metricsClient := metrics_fake.NewSimpleClientset()
dynamicClient := dynamic_fake.NewSimpleDynamicClient(runtime.NewScheme())
f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)

version.EXPECT().Full().Return("1.21+").MaxTimes(3)

Expand Down Expand Up @@ -612,13 +624,26 @@ func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {
})

log.SetLevel(logrus.DebugLevel)
ctrl := New(log, f, df, clientset.Discovery(), castaiclient, metricsClient, provider, clusterID.String(), &config.Controller{
Interval: 2 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
}, version, agentVersion, NewHealthzProvider(defaultHealthzCfg, log), clientset.AuthorizationV1().SelfSubjectAccessReviews())
ctrl := New(
log,
clientset,
dynamicClient,
castaiclient,
metricsClient,
provider,
clusterID.String(),
&config.Controller{
Interval: 2 * time.Second,
PrepTimeout: 2 * time.Second,
InitialSleepDuration: 10 * time.Millisecond,
},
version,
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
)

f.Start(ctx.Done())
ctrl.Start(ctx.Done())

go func() {
require.NoError(t, ctrl.Run(ctx))
Expand Down
11 changes: 3 additions & 8 deletions internal/services/controller/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/metrics/pkg/client/clientset/versioned"

Expand Down Expand Up @@ -49,13 +47,10 @@ func Loop(

log = log.WithField("k8s_version", v.Full())

f := informers.NewSharedInformerFactory(clientset, 0)
df := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0)
ctrl := New(
log,
f,
df,
clientset.Discovery(),
clientset,
dynamicClient,
castaiclient,
metricsClient,
provider,
Expand All @@ -67,7 +62,7 @@ func Loop(
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
)

f.Start(ctrlCtx.Done())
ctrl.Start(ctrlCtx.Done())

// Loop the controller. This is a blocking call.
return ctrl.Run(ctrlCtx)
Expand Down

0 comments on commit 02f19f2

Please sign in to comment.