Skip to content

Commit

Permalink
Remote image state sync (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao committed Sep 25, 2023
1 parent 8b575b9 commit 9592ecc
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 174 deletions.
34 changes: 29 additions & 5 deletions castai/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,24 @@ const (
headerContentEncoding = "Content-Encoding"
totalSendDeltaTimeout = 2 * time.Minute

ReportTypeDelta = "delta"
ReportTypeCis = "cis-report"
ReportTypeLinter = "linter-checks"
ReportTypeImageMeta = "image-metadata"
ReportTypeCloudScan = "cloud-scan"
ReportTypeImagesResourcesChange = "images-resources-change"
ReportTypeDelta = "delta"
ReportTypeCis = "cis-report"
ReportTypeLinter = "linter-checks"
ReportTypeImageMeta = "image-metadata"
ReportTypeCloudScan = "cloud-scan"
)

type Client interface {
SendLogs(ctx context.Context, req *LogEvent) error
SendImagesResourcesChange(ctx context.Context, report *ImagesResourcesChange) error
SendCISReport(ctx context.Context, report *KubeBenchReport) error
SendDeltaReport(ctx context.Context, report *Delta) error
SendLinterChecks(ctx context.Context, checks []LinterCheck) error
SendImageMetadata(ctx context.Context, meta *ImageMetadata) error
SendCISCloudScanReport(ctx context.Context, report *CloudScanReport) error
PostTelemetry(ctx context.Context, initial bool) (*TelemetryResponse, error)
GetSyncState(ctx context.Context, filter *SyncStateFilter) (*SyncStateResponse, error)
}

func NewClient(
Expand Down Expand Up @@ -142,6 +145,10 @@ func (c *client) SendLogs(ctx context.Context, req *LogEvent) error {
return nil
}

func (c *client) SendImagesResourcesChange(ctx context.Context, report *ImagesResourcesChange) error {
return c.sendReport(ctx, report, ReportTypeImagesResourcesChange)
}

func (c *client) SendDeltaReport(ctx context.Context, report *Delta) error {
return c.sendReport(ctx, report, ReportTypeDelta)
}
Expand Down Expand Up @@ -237,3 +244,20 @@ func (c *client) sendReport(ctx context.Context, report any, reportType string)

return nil
}

func (c *client) GetSyncState(ctx context.Context, filter *SyncStateFilter) (*SyncStateResponse, error) {
req := c.restClient.R().SetContext(ctx)
req.SetBody(filter)
resp, err := req.Post(fmt.Sprintf("/v1/security/insights/%s/sync-state", c.clusterID))
if err != nil {
return nil, fmt.Errorf("calling sync state: %w", err)
}
if resp.IsError() {
return nil, fmt.Errorf("calling sync state: request error status_code=%d body=%s", resp.StatusCode(), resp.Body())
}
var response SyncStateResponse
if err := json.Unmarshal(resp.Body(), &response); err != nil {
return nil, fmt.Errorf("unmarshal sync state: %w", err)
}
return &response, nil
}
31 changes: 23 additions & 8 deletions castai/imagemeta_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
)

type ImageMetadata struct {
ImageName string `json:"imageName,omitempty"`
ImageID string `json:"imageID,omitempty"`
ImageDigest string `json:"imageDigest,omitempty"`
ResourceIDs []string `json:"resourceIDs,omitempty"`
RemovedResourceIDs []string `json:"removedResourceIDs,omitempty"`
Architecture string `json:"architecture,omitempty"`
BlobsInfo []types.BlobInfo `json:"blobsInfo,omitempty"`
ConfigFile *v1.ConfigFile `json:"configFile,omitempty"`
ImageName string `json:"imageName,omitempty"`
ImageID string `json:"imageID,omitempty"`
ImageDigest string `json:"imageDigest,omitempty"`
ResourceIDs []string `json:"resourceIDs,omitempty"`
Architecture string `json:"architecture,omitempty"`
BlobsInfo []types.BlobInfo `json:"blobsInfo,omitempty"`
ConfigFile *v1.ConfigFile `json:"configFile,omitempty"`
// Manifest specification can be found here: https://github.com/opencontainers/image-spec/blob/main/manifest.md
Manifest *v1.Manifest `json:"manifest,omitempty"`
// Index specification can be found here: https://github.com/opencontainers/image-spec/blob/main/image-index.md
Expand All @@ -26,3 +25,19 @@ type OsInfo struct {
*types.ArtifactInfo `json:",inline"`
*types.OS `json:",inline"`
}

type ImagesResourcesChange struct {
FullSnapshot bool `json:"full_snapshot,omitempty"`
Images []Image `json:"images"`
}

type Image struct {
ID string `json:"id"`
Architecture string `json:"architecture"`
ResourcesChange ResourcesChange `json:"resourcesChange"`
}

type ResourcesChange struct {
ResourceIDs []string `json:"resourceIDs"`
RemovedResourceIDs []string `json:"removedResourceIDs"`
}
29 changes: 29 additions & 0 deletions castai/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions castai/syncstate_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package castai

type SyncStateFilter struct {
ImagesIds []string `json:"imagesIds"`
}

type SyncStateResponse struct {
Images *ImagesSyncState `json:"images"`
}

type ImagesSyncState struct {
FullResourcesResyncRequired bool `json:"fullResourcesResyncRequired"`
ScannedImages []ScannedImage `json:"scannedImages"`
}

type ScannedImage struct {
ID string `json:"id"`
Architecture string `json:"architecture"`
ResourceIDs []string `json:"resourceIds"`
}

func (s ScannedImage) CacheKey() string {
return s.ID + s.Architecture
}
19 changes: 4 additions & 15 deletions castai/telemetry_types.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
package castai

type TelemetryResponse struct {
DisabledFeatures []string `json:"disabledFeatures"`
FullResync bool `json:"fullResync"`
ScannedImages []ScannedImage `json:"scannedImages"`
NodeIDs []string `json:"nodeIds"`
EnforcedRules []string `json:"enforcedRules"`
}

type ScannedImage struct {
ID string `json:"id"`
Architecture string `json:"architecture"`
ResourceIDs []string `json:"resourceIds"`
}

func (s ScannedImage) CacheKey() string {
return s.ID + s.Architecture
DisabledFeatures []string `json:"disabledFeatures"`
FullResync bool `json:"fullResync"`
NodeIDs []string `json:"nodeIds"`
EnforcedRules []string `json:"enforcedRules"`
}
18 changes: 6 additions & 12 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,12 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli
telemetryManager := telemetry.NewManager(log, castaiClient)

var scannedNodes []string
var scannedImages []castai.ScannedImage
telemetryResponse, err := castaiClient.PostTelemetry(ctx, true)
if err != nil {
log.Warnf("initial telemetry: %v", err)
} else {
cfg = telemetry.ModifyConfig(cfg, telemetryResponse)
scannedNodes = telemetryResponse.NodeIDs
scannedImages = telemetryResponse.ScannedImages
}

linter, err := kubelinter.New(lo.Keys(castai.LinterRuleMap))
Expand Down Expand Up @@ -242,21 +240,17 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli
))
}
if cfg.ImageScan.Enabled {
log.Infof("imagescan enabled, already scanned %d images", len(scannedImages))
if cfg.ImageScan.Force {
scannedImages = []castai.ScannedImage{}
}
deltaState := imagescan.NewDeltaState(scannedImages)
telemetryManager.AddObservers(deltaState.Observe)
objectSubscribers = append(objectSubscribers, imagescan.NewSubscriber(
log.Info("imagescan enabled")
imgScanSubscriber := imagescan.NewSubscriber(
log,
cfg.ImageScan,
imagescan.NewImageScanner(clientSet, cfg, deltaState),
imagescan.NewImageScanner(clientSet, cfg),
castaiClient,
k8sVersion.MinorInt,
deltaState,
))
)
objectSubscribers = append(objectSubscribers, imgScanSubscriber)
}

if len(objectSubscribers) == 0 {
return errors.New("no subscribers enabled")
}
Expand Down
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type ImageScan struct {
CPULimit string `envconfig:"IMAGE_SCAN_CPU_LIMIT" yaml:"cpuLimit"`
MemoryRequest string `envconfig:"IMAGE_SCAN_MEMORY_REQUEST" yaml:"memoryRequest"`
MemoryLimit string `envconfig:"IMAGE_SCAN_MEMORY_LIMIT" yaml:"memoryLimit"`
Force bool `envconfig:"IMAGE_SCAN_FORCE" yaml:"force"`
ProfileEnabled bool `envconfig:"IMAGE_SCAN_PROFILE_ENABLED" yaml:"profileEnabled"`
PhlareEnabled bool `envconfig:"IMAGE_SCAN_PHLARE_ENABLED" yaml:"phlareEnabled"`
PullSecret string `envconfig:"IMAGE_SCAN_PULL_SECRET" yaml:"pullSecret"`
Expand Down
2 changes: 1 addition & 1 deletion e2e/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -ex

KIND_CONTEXT="${KIND_CONTEXT:-kind}"
GOARCH="${GOARCH:-amd64}"
GOARCH="$(go env GOARCH)"

if [ "$IMAGE_TAG" == "" ]
then
Expand Down
60 changes: 14 additions & 46 deletions imagescan/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/castai/kvisor/castai"
imgcollectorconfig "github.com/castai/kvisor/cmd/imgcollector/config"
"github.com/castai/kvisor/controller"
)
Expand All @@ -23,26 +22,6 @@ var (
errNoCandidates = errors.New("no candidates")
)

func buildImageMap(scannedImages []castai.ScannedImage) map[string]*image {
images := map[string]*image{}
for _, scannedImage := range scannedImages {
owners := make(map[string]*imageOwner, len(scannedImage.ResourceIDs))
for _, id := range scannedImage.ResourceIDs {
owners[id] = &imageOwner{
podIDs: map[string]struct{}{},
}
}

img := newImage(scannedImage.ID, scannedImage.Architecture)
img.scanned = true
img.owners = owners
img.architecture = scannedImage.Architecture
images[scannedImage.CacheKey()] = img
}

return images
}

func newImage(imageID, architecture string) *image {
return &image{
id: imageID,
Expand All @@ -59,14 +38,13 @@ func newImage(imageID, architecture string) *image {
}
}

func NewDeltaState(scannedImages []castai.ScannedImage) *deltaState {
func newDeltaState() *deltaState {
return &deltaState{
queue: make(chan deltaQueueItem, 1000),
remoteImagesUpdate: make(chan []castai.ScannedImage, 3),
images: buildImageMap(scannedImages),
rs: make(map[string]*appsv1.ReplicaSet),
jobs: make(map[string]*batchv1.Job),
nodes: map[string]*node{},
queue: make(chan deltaQueueItem, 1000),
images: map[string]*image{},
rs: make(map[string]*appsv1.ReplicaSet),
jobs: make(map[string]*batchv1.Job),
nodes: map[string]*node{},
}
}

Expand All @@ -80,9 +58,6 @@ type deltaState struct {
// This allows to have lock free access to delta state during image scan.
queue chan deltaQueueItem

// remoteImagesUpdate is signal to update delta images from telemetry.
remoteImagesUpdate chan []castai.ScannedImage

// images holds current cluster images state. image struct contains associated nodes and owners.
images map[string]*image

Expand All @@ -94,12 +69,6 @@ type deltaState struct {
hostFSDisabled bool
}

func (d *deltaState) Observe(response *castai.TelemetryResponse) {
if response.FullResync && len(response.ScannedImages) > 0 {
d.remoteImagesUpdate <- response.ScannedImages
}
}

func (d *deltaState) upsert(o controller.Object) {
key := controller.ObjectKey(o)
switch v := o.(type) {
Expand Down Expand Up @@ -128,10 +97,6 @@ func (d *deltaState) delete(o controller.Object) {
}
}

func (d *deltaState) updateImagesFromRemote(images []castai.ScannedImage) {
d.images = buildImageMap(images)
}

func (d *deltaState) handlePodUpdate(v *corev1.Pod) {
d.upsertImages(v)
d.updateNodesUsageFromPod(v)
Expand Down Expand Up @@ -305,11 +270,6 @@ func (d *deltaState) getImages() []*image {
return lo.Values(d.images)
}

func (d *deltaState) getNode(name string) (*node, bool) {
v, found := d.nodes[name]
return v, found
}

func (d *deltaState) updateImage(i *image, change func(img *image)) {
img := d.images[i.cacheKey()]
if img != nil {
Expand Down Expand Up @@ -367,6 +327,12 @@ func (d *deltaState) isHostFsDisabled() bool {
return d.hostFSDisabled
}

func (d *deltaState) setImageScanned(key string) {
if img, found := d.images[key]; found {
img.scanned = true
}
}

func getContainerRuntime(containerID string) imgcollectorconfig.Runtime {
parts := strings.Split(containerID, "://")
if len(parts) != 2 {
Expand Down Expand Up @@ -513,6 +479,8 @@ type image struct {
failures int // Used for sorting. We want to scan non-failed images first.
retryBackoff wait.Backoff // Retry state for failed images.
nextScan time.Time // Set based on retry backoff.

lastRemoteSyncAt time.Time // Time then image state was synced from remote.
}

func (img *image) cacheKey() string {
Expand Down
Loading

0 comments on commit 9592ecc

Please sign in to comment.