Skip to content

Commit

Permalink
Merge pull request #174 from castai/feat-send-deltas-non-sliding-window
Browse files Browse the repository at this point in the history
feat: send cluster deltas with a non-sliding interval
  • Loading branch information
vladklokun committed Jun 18, 2024
2 parents cae2111 + 554eeaf commit 223d668
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 1 deletion.
14 changes: 13 additions & 1 deletion internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Controller struct {
delta *delta.Delta
deltaMu sync.Mutex

// sendMu ensures that only one goroutine can send deltas
sendMu sync.Mutex

triggerRestart func()

agentVersion *config.AgentVersion
Expand Down Expand Up @@ -280,7 +283,16 @@ func (c *Controller) Run(ctx context.Context) error {
c.healthzProvider.Initialized()

c.log.Infof("sending cluster deltas every %s", c.cfg.Interval)
wait.Until(func() {

wait.NonSlidingUntil(func() {
// Check if another goroutine is trying to send deltas.
if !c.sendMu.TryLock() {
// If it is, don't try to send deltas on this turn to avoid a backlog of sends.
return
}
// Since Mutex.TryLock() acquires a lock on success,
// release it immediately to allow the new sending goroutine to do its job.
defer c.sendMu.Unlock()
c.send(ctx)
}, c.cfg.Interval, ctx.Done())
return nil
Expand Down
152 changes: 152 additions & 0 deletions internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -244,6 +245,157 @@ func TestController_ApiResourcesErrorProcessing(t *testing.T) {
}])
}

func TestController_ShouldSendByInterval(t *testing.T) {
tt := []struct {
name string
sendInterval time.Duration
sendDurations []time.Duration
checkAfter time.Duration
wantSends int64
}{
{
name: "should trigger all sends when none exceed allocated intervals",
sendInterval: 300 * time.Millisecond,
sendDurations: []time.Duration{
// Send by 300ms
250 * time.Millisecond,
// ...by 600ms
250 * time.Millisecond,
// ...by 900ms
250 * time.Millisecond,
// ...by 1200ms
250 * time.Millisecond,
},
checkAfter: 1200 * time.Millisecond,
wantSends: 4,
},
{
name: "should trigger all sends when previous send exceeds one interval",
sendInterval: 300 * time.Millisecond,
sendDurations: []time.Duration{
// At 0, 300: idle, previous still sending
// Send by 600 ms
450 * time.Millisecond,
// Send by 600 ms too, since we expect ticker to fire events even between ticks
50 * time.Millisecond,
},
checkAfter: 600 * time.Millisecond,
wantSends: 2,
},
{
name: "should trigger all sends when previous send exceeds two intervals",
sendInterval: 300 * time.Millisecond,
sendDurations: []time.Duration{
// At 0, 300, 600ms: idle, previous still sending
// Send by 900 ms
650 * time.Millisecond,
// Send by 900 ms too, since we expect ticker to fire events even between ticks
150 * time.Millisecond,
},
checkAfter: 900 * time.Millisecond,
wantSends: 2,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
mockctrl := gomock.NewController(t)
castaiclient := mock_castai.NewMockClient(mockctrl)
version := mock_version.NewMockInterface(mockctrl)
provider := mock_types.NewMockProvider(mockctrl)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: v1.NamespaceDefault, Name: "pod1"}}
_, err := delta.Encode(pod)
r := require.New(t)
r.NoError(err)

clientset := fake.NewSimpleClientset()
metricsClient := metrics_fake.NewSimpleClientset()
dynamicClient := dynamic_fake.NewSimpleDynamicClient(runtime.NewScheme())

version.EXPECT().Full().Return("1.21+").AnyTimes()

clusterID := uuid.New()
log := logrus.New()

var gotSends atomic.Int64
var wg sync.WaitGroup
wg.Add(len(tc.sendDurations))
var sentFirstTickNotification atomic.Bool

var firstTickAt time.Time
var lastSentAt time.Time

for _, sendDuration := range tc.sendDurations {
castaiclient.EXPECT().
SendDelta(gomock.Any(), clusterID.String(), gomock.Any()).
DoAndReturn(func(_ context.Context, clusterID string, d *castai.Delta) error {
if !sentFirstTickNotification.Load() {
firstTickAt = time.Now()
sentFirstTickNotification.Store(true)
}
time.Sleep(sendDuration)
if gotSends.Add(1) == tc.wantSends {
lastSentAt = time.Now()
}
wg.Done()
return nil
})
}

agentVersion := &config.AgentVersion{Version: "1.2.3"}
castaiclient.EXPECT().ExchangeAgentTelemetry(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
Return(&castai.AgentTelemetryResponse{}, nil).
Do(func(ctx context.Context, clusterID string, req *castai.AgentTelemetryRequest) {
r.Equalf("1.2.3", req.AgentVersion, "got request: %+v", req)
})

log.SetLevel(logrus.DebugLevel)
ctrl := New(
log,
clientset,
dynamicClient,
castaiclient,
metricsClient,
provider,
clusterID.String(),
&config.Controller{
Interval: tc.sendInterval,
PrepTimeout: 300 * time.Millisecond,
InitialSleepDuration: 10 * time.Millisecond,
},
version,
agentVersion,
NewHealthzProvider(defaultHealthzCfg, log),
clientset.AuthorizationV1().SelfSubjectAccessReviews(),
"castai-agent",
)

ctrl.Start(ctx.Done())

go func() {
r.NoError(ctrl.Run(ctx))
}()
wg.Wait()

r.Equal(tc.wantSends, gotSends.Load(), "sends don't match, failing at: %s", time.Now())
elapsed := lastSentAt.Sub(firstTickAt)
deadline := tc.checkAfter
r.LessOrEqualf(elapsed, deadline, "elapsed time is greater than deadline: %s > %s", elapsed, deadline)

wait.Until(func() {
if gotSends.Load() == int64(len(tc.sendDurations)) {
cancel()
}
}, 10*time.Millisecond, ctx.Done())
})
}

}

func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {
mockctrl := gomock.NewController(t)
castaiclient := mock_castai.NewMockClient(mockctrl)
Expand Down

0 comments on commit 223d668

Please sign in to comment.