Skip to content

Commit

Permalink
Add k8s resources spec (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao committed Nov 8, 2023
1 parent 15e9c2a commit 468d2c0
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 35 deletions.
8 changes: 6 additions & 2 deletions castai/delta_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package castai

import (
"time"

json "github.com/json-iterator/go"
)

type EventType string
Expand Down Expand Up @@ -29,8 +31,10 @@ type DeltaItem struct {
ObjectLabels map[string]string `json:"object_labels,omitempty"`

// ObjectContainers and ObjectStatus are set only for objects which could contain containers.
ObjectContainers []Container `json:"object_containers,omitempty"`
ObjectStatus interface{} `json:"object_status,omitempty"`
ObjectContainers []Container `json:"object_containers,omitempty"`
ObjectStatus json.RawMessage `json:"object_status,omitempty"`

ObjectSpec json.RawMessage `json:"object_spec,omitempty"`
}

type Container struct {
Expand Down
36 changes: 23 additions & 13 deletions delta/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package delta

import (
"context"
"fmt"
"testing"
"time"

"github.com/samber/lo"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -21,14 +23,15 @@ func TestSubscriber(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)

pod1 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"},
pod1 := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Name: "nginx-1",
Namespace: "default",
UID: types.UID("111b56a9-ab5e-4a35-93af-f092e2f63011"),
OwnerReferences: []metav1.OwnerReference{
{
UID: types.UID("owner"),
APIVersion: "v1",
Kind: kindNode,
Controller: lo.ToPtr(true),
Expand All @@ -37,23 +40,29 @@ func TestSubscriber(t *testing.T) {
},
Labels: map[string]string{"subscriber": "test"},
},
Spec: corev1.PodSpec{
NodeName: "n1",
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.23",
Spec: appsv1.DeploymentSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
NodeName: "n1",
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.23",
},
},
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Status: appsv1.DeploymentStatus{
Replicas: 1,
},
}

assertDelta := func(t *testing.T, delta *castai.Delta, event castai.EventType, initial bool) {
t.Helper()
r := require.New(t)
podUID := "111b56a9-ab5e-4a35-93af-f092e2f63011"
fmt.Println(string(delta.Items[0].ObjectSpec))
r.Equal(&castai.Delta{
FullSnapshot: initial,
Items: []castai.DeltaItem{
Expand All @@ -62,7 +71,7 @@ func TestSubscriber(t *testing.T) {
ObjectUID: podUID,
ObjectName: "nginx-1",
ObjectNamespace: "default",
ObjectKind: "Pod",
ObjectKind: "Deployment",
ObjectAPIVersion: "v1",
ObjectLabels: map[string]string{"subscriber": "test"},
ObjectContainers: []castai.Container{
Expand All @@ -71,8 +80,9 @@ func TestSubscriber(t *testing.T) {
ImageName: "nginx:1.23",
},
},
ObjectStatus: corev1.PodStatus{Phase: corev1.PodRunning},
ObjectOwnerUID: podUID,
ObjectStatus: []byte(`{"replicas":1}`),
ObjectOwnerUID: "owner",
ObjectSpec: []byte(`{"selector":null,"template":{"metadata":{"creationTimestamp":null},"spec":{"containers":[{"name":"nginx","image":"nginx:1.23","resources":{}}],"nodeName":"n1"}},"strategy":{}}`),
},
},
}, delta)
Expand Down
75 changes: 55 additions & 20 deletions delta/delta.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package delta

import (
json "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

Expand Down Expand Up @@ -73,10 +75,25 @@ func (d *delta) add(event kube.Event, obj object) {
ObjectOwnerUID: d.getOwnerUID(obj),
ObjectLabels: obj.GetLabels(),
}
if containers, status, ok := getContainersAndStatus(obj); ok {
deltaItem.ObjectContainers = containers

containers, status, err := getContainersAndStatus(obj)
if err != nil {
d.log.Errorf("getting object status json: %v", err)
}
if len(status) > 0 {
deltaItem.ObjectStatus = status
}
if len(containers) > 0 {
deltaItem.ObjectContainers = containers
}

spec, err := getObjectSpec(obj)
if err != nil {
d.log.Errorf("getting object spec json: %v", err)
}
if len(spec) > 0 {
deltaItem.ObjectSpec = spec
}

d.cache[key] = deltaItem
d.snapshot.append(deltaItem)
Expand Down Expand Up @@ -111,34 +128,47 @@ func toCASTAIEvent(e kube.Event) castai.EventType {
return ""
}

func getContainersAndStatus(obj kube.Object) ([]castai.Container, interface{}, bool) {
func (d *delta) getOwnerUID(obj kube.Object) string {
switch v := obj.(type) {
case *corev1.Pod:
return d.podOwnerGetter.GetPodOwnerID(v)
}

if len(obj.GetOwnerReferences()) == 0 {
return ""
}
return string(obj.GetOwnerReferences()[0].UID)
}

func getContainersAndStatus(obj kube.Object) ([]castai.Container, []byte, error) {
var containers []corev1.Container
appendContainers := func(podSpec corev1.PodSpec) {
containers = append(containers, podSpec.Containers...)
containers = append(containers, podSpec.InitContainers...)
}
var st interface{}
var st []byte
var err error
switch v := obj.(type) {
case *batchv1.Job:
st = v.Status
st, err = json.Marshal(v.Status)
appendContainers(v.Spec.Template.Spec)
case *batchv1.CronJob:
st = v.Status
st, err = json.Marshal(v.Status)
appendContainers(v.Spec.JobTemplate.Spec.Template.Spec)
case *corev1.Pod:
st = v.Status
st, err = json.Marshal(v.Status)
appendContainers(v.Spec)
case *appsv1.Deployment:
st = v.Status
st, err = json.Marshal(v.Status)
appendContainers(v.Spec.Template.Spec)
case *appsv1.StatefulSet:
st = v.Status
st, err = json.Marshal(v.Status)
appendContainers(v.Spec.Template.Spec)
case *appsv1.DaemonSet:
st = v.Status
st, err = json.Marshal(v.Status)
appendContainers(v.Spec.Template.Spec)
default:
return nil, nil, false
return nil, nil, nil
}

res := make([]castai.Container, len(containers))
Expand All @@ -148,17 +178,22 @@ func getContainersAndStatus(obj kube.Object) ([]castai.Container, interface{}, b
ImageName: cont.Image,
}
}
return res, st, true
return res, st, err
}

func (d *delta) getOwnerUID(obj kube.Object) string {
func getObjectSpec(obj object) ([]byte, error) {
switch v := obj.(type) {
case *corev1.Pod:
return d.podOwnerGetter.GetPodOwnerID(v)
}

if len(obj.GetOwnerReferences()) == 0 {
return ""
case *networkingv1.Ingress:
return json.Marshal(v.Spec)
case *corev1.Service:
return json.Marshal(v.Spec)
case *appsv1.Deployment:
return json.Marshal(v.Spec)
case *appsv1.StatefulSet:
return json.Marshal(v.Spec)
case *appsv1.DaemonSet:
return json.Marshal(v.Spec)
default:
return nil, nil
}
return string(obj.GetOwnerReferences()[0].UID)
}

0 comments on commit 468d2c0

Please sign in to comment.