Skip to content

Commit

Permalink
Use CAST AI managed nodes for scans (#176)
Browse files Browse the repository at this point in the history
* Use CAST AI managed nodes for scans
  • Loading branch information
damejeras committed Oct 23, 2023
1 parent e62d812 commit 92c9a97
Show file tree
Hide file tree
Showing 17 changed files with 309 additions and 78 deletions.
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ linters:
- varcheck
- prealloc
- contextcheck
- protogetter
presets:
- bugs
- performance
Expand Down
8 changes: 5 additions & 3 deletions cmd/imgcollector/analyzer/pkg/apk/apk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aquasecurity/trivy/pkg/fanal/types"
)
Expand Down Expand Up @@ -284,7 +284,9 @@ func TestParseApkInfo(t *testing.T) {
}
scanner := bufio.NewScanner(read)
gotPkgs, gotFiles := a.parseApkInfo(scanner)
assert.Equal(t, v.wantPkgs, gotPkgs)
assert.Equal(t, v.wantFiles, gotFiles)

r := require.New(t)
r.Equal(v.wantPkgs, gotPkgs)
r.Equal(v.wantFiles, gotFiles)
}
}
11 changes: 6 additions & 5 deletions cmd/imgcollector/analyzer/pkg/dpkg/copyright_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aquasecurity/trivy/pkg/fanal/analyzer"
Expand Down Expand Up @@ -83,8 +82,9 @@ func Test_dpkgLicenseAnalyzer_Analyze(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := require.New(t)
f, err := os.Open(tt.testFile)
require.NoError(t, err)
r.NoError(err)

input := analyzer.AnalysisInput{
Content: f,
Expand All @@ -93,8 +93,8 @@ func Test_dpkgLicenseAnalyzer_Analyze(t *testing.T) {
a := dpkgLicenseAnalyzer{}

license, err := a.Analyze(context.Background(), input)
require.NoError(t, err)
assert.Equal(t, tt.want, license)
r.NoError(err)
r.Equal(tt.want, license)
})
}
}
Expand Down Expand Up @@ -123,8 +123,9 @@ func Test_dpkgLicenseAnalyzer_Required(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := require.New(t)
a := dpkgLicenseAnalyzer{}
assert.Equal(t, tt.want, a.Required(tt.filePath, nil))
r.Equal(tt.want, a.Required(tt.filePath, nil))
})
}
}
11 changes: 6 additions & 5 deletions cmd/imgcollector/analyzer/pkg/dpkg/dpkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sort"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aquasecurity/trivy/pkg/fanal/analyzer"
Expand Down Expand Up @@ -1030,7 +1029,8 @@ func Test_dpkgAnalyzer_Analyze(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f, err := os.Open(tt.testFile)
require.NoError(t, err)
r := require.New(t)
r.NoError(err)
defer f.Close()

a := dpkgAnalyzer{}
Expand All @@ -1045,8 +1045,8 @@ func Test_dpkgAnalyzer_Analyze(t *testing.T) {
got.PackageInfos[i].Packages = sortPkgs(got.PackageInfos[i].Packages)
}

assert.Equal(t, tt.wantErr, err != nil, err)
assert.Equal(t, tt.want, got)
r.Equal(tt.wantErr, err != nil, err)
r.Equal(tt.want, got)
})
}
}
Expand Down Expand Up @@ -1090,9 +1090,10 @@ func Test_dpkgAnalyzer_Required(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := require.New(t)
a := dpkgAnalyzer{}
got := a.Required(tt.filePath, nil)
assert.Equal(t, tt.want, got)
r.Equal(tt.want, got)
})
}
}
19 changes: 10 additions & 9 deletions cmd/imgcollector/analyzer/pkg/rpm/rpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sort"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aquasecurity/trivy/pkg/fanal/types"
Expand Down Expand Up @@ -582,11 +581,12 @@ func TestParseRpmInfo(t *testing.T) {
for testname, tc := range tests {
t.Run(testname, func(t *testing.T) {
f, err := os.Open(tc.path)
require.NoError(t, err)
r := require.New(t)
r.NoError(err)
defer f.Close()

got, _, err := a.parsePkgInfo(f)
require.NoError(t, err)
r.NoError(err)

sort.Slice(tc.pkgs, func(i, j int) bool {
return tc.pkgs[i].Name < tc.pkgs[j].Name
Expand All @@ -600,7 +600,7 @@ func TestParseRpmInfo(t *testing.T) {
got[i].DependsOn = nil // TODO: add tests
}

assert.Equal(t, tc.pkgs, got)
r.Equal(tc.pkgs, got)
})
}
}
Expand Down Expand Up @@ -633,15 +633,16 @@ func Test_splitFileName(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := require.New(t)
gotName, gotVer, gotRel, err := splitFileName(tt.filename)
if tt.wantErr {
assert.Error(t, err)
r.Error(err)
} else {
assert.NoError(t, err)
r.NoError(err)
}
assert.Equal(t, tt.wantName, gotName)
assert.Equal(t, tt.wantVer, gotVer)
assert.Equal(t, tt.wantRel, gotRel)
r.Equal(tt.wantName, gotName)
r.Equal(tt.wantVer, gotVer)
r.Equal(tt.wantRel, gotRel)
})
}
}
11 changes: 6 additions & 5 deletions cmd/imgcollector/analyzer/pkg/rpm/rpmqa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aquasecurity/trivy/pkg/fanal/types"
)
Expand Down Expand Up @@ -60,14 +60,15 @@ glibc 2.35-2.cm2 1653816591 1653628955 Microsoft Corporation (none) 10855265 x86

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := require.New(t)
a := rpmqaPkgAnalyzer{}
result, err := a.parseRpmqaManifest(strings.NewReader(test.content))
if test.wantErr != "" {
assert.NotNil(t, err)
assert.Equal(t, test.wantErr, err.Error())
r.Error(err)
r.Equal(test.wantErr, err.Error())
} else {
assert.NoError(t, err)
assert.Equal(t, test.wantPkgs, result)
r.NoError(err)
r.Equal(test.wantPkgs, result)
}
})
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/imgcollector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"testing"
"testing/fstest"

"github.com/castai/kvisor/cmd/imgcollector/image"
"github.com/stretchr/testify/require"

"github.com/castai/kvisor/cmd/imgcollector/image"
)

func TestReadImagePullSecret(t *testing.T) {
Expand All @@ -18,7 +19,8 @@ func TestReadImagePullSecret(t *testing.T) {
r.NoError(err)

var cfg image.DockerConfig
r.NoError(err, json.Unmarshal(data, &cfg))
err = json.Unmarshal(data, &cfg)
r.NoError(err)
auth := cfg.Auths["ghcr.io"]
r.Equal("username", auth.Username)
r.Equal("password", auth.Password)
Expand Down
5 changes: 3 additions & 2 deletions cmd/imgcollector/image/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/google/go-containerregistry/pkg/name"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNamespacedRegistry(t *testing.T) {
Expand Down Expand Up @@ -37,8 +37,9 @@ func TestNamespacedRegistry(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := require.New(t)
registry := NamespacedRegistry(test.ref)
assert.Equal(t, test.expected, registry)
r.Equal(test.expected, registry)
})
}
}
9 changes: 4 additions & 5 deletions delta/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package delta

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -89,7 +88,7 @@ func TestSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
r.ErrorIs(err, context.DeadlineExceeded)
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventAdd, true)
Expand All @@ -106,7 +105,7 @@ func TestSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
r.ErrorIs(err, context.DeadlineExceeded)
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventUpdate, true)
Expand All @@ -124,7 +123,7 @@ func TestSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
r.ErrorIs(err, context.DeadlineExceeded)
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventDelete, true)
Expand All @@ -145,7 +144,7 @@ func TestSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*30)
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
r.ErrorIs(err, context.DeadlineExceeded)
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventAdd, false)
Expand Down
2 changes: 1 addition & 1 deletion delta/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestSnapshotProvider(t *testing.T) {
ObjectCreatedAt: now,
})

r.Len(provider.snapshot(), 0)
r.Empty(provider.snapshot())
}

func TestResyncObserver(t *testing.T) {
Expand Down
57 changes: 37 additions & 20 deletions imagescan/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,18 +254,26 @@ func (s *Controller) scanImages(ctx context.Context, images []*image) error {
}
}

func (s *Controller) scanImage(ctx context.Context, img *image) (rerr error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

mode := s.getScanMode(img)
func (s *Controller) findBestNodeAndMode(img *image) (string, string, error) {
mode := s.cfg.Mode
if img.lastScanErr != nil && errors.Is(img.lastScanErr, errImageScanLayerNotFound) {
// Fallback to remote if previously it failed due to missing layers.
mode = string(imgcollectorconfig.ModeRemote)
}

var nodeNames []string
if imgcollectorconfig.Mode(mode) == imgcollectorconfig.ModeHostFS {
// In HostFS we need to choose only from nodes which contains this image.
nodeNames = lo.Keys(img.nodes)
if len(nodeNames) == 0 {
return errors.New("image with empty nodes")
return "", "", errors.New("image with empty nodes")
}

nodeNames = s.delta.filterCastAIManagedNodes(nodeNames)
if len(nodeNames) == 0 {
// If image is not running on CAST AI managed nodes fallback to remote scan.
mode = string(imgcollectorconfig.ModeRemote)
nodeNames = lo.Keys(s.delta.nodes)
}
} else {
nodeNames = lo.Keys(s.delta.nodes)
Expand All @@ -275,6 +283,28 @@ func (s *Controller) scanImage(ctx context.Context, img *image) (rerr error) {
memQty := resource.MustParse(s.cfg.MemoryRequest)
cpuQty := resource.MustParse(s.cfg.CPURequest)
resolvedNode, err := s.delta.findBestNode(nodeNames, memQty.AsDec(), cpuQty.AsDec())
if err != nil {
if errors.Is(err, errNoCandidates) && imgcollectorconfig.Mode(mode) == imgcollectorconfig.ModeHostFS {
// if mode was host fs fallback to remote scan and try picking node again.
mode = string(imgcollectorconfig.ModeRemote)
nodeNames = lo.Keys(s.delta.nodes)
resolvedNode, err = s.delta.findBestNode(nodeNames, memQty.AsDec(), cpuQty.AsDec())
if err != nil {
return "", "", err
}
} else {
return "", "", err
}
}

return resolvedNode, mode, nil
}

func (s *Controller) scanImage(ctx context.Context, img *image) (rerr error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

node, mode, err := s.findBestNodeAndMode(img)
if err != nil {
return err
}
Expand All @@ -291,7 +321,7 @@ func (s *Controller) scanImage(ctx context.Context, img *image) (rerr error) {
ContainerRuntime: string(img.containerRuntime),
Mode: mode,
ResourceIDs: lo.Keys(img.owners),
NodeName: resolvedNode,
NodeName: node,
DeleteFinishedJob: true,
WaitForCompletion: true,
WaitDurationAfterCompletion: 30 * time.Second,
Expand All @@ -306,19 +336,6 @@ func (s *Controller) concurrentScansNumber() int {
return int(s.cfg.MaxConcurrentScans)
}

// getScanMode returns configured image scan mode if set.
// If mode is empty it will be determined automatically based on container runtime inside scanner.go
//
// Special case:
// If hostfs mode is used and image scan fails due to missing layers remote image scan will be used as fallback.
func (s *Controller) getScanMode(img *image) string {
mode := s.cfg.Mode
if s.delta.isHostFsDisabled() || (img.lastScanErr != nil && errors.Is(img.lastScanErr, errImageScanLayerNotFound)) {
mode = string(imgcollectorconfig.ModeRemote)
}
return mode
}

func (s *Controller) sendImagesResourcesChanges(ctx context.Context) {
images := s.delta.getImages()
var imagesChanges []castai.Image
Expand Down
Loading

0 comments on commit 92c9a97

Please sign in to comment.