Skip to content

Commit

Permalink
fix: handle informer handles to fully processed in dump (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikenorgate committed Jun 26, 2024
1 parent 223d668 commit 4bbc19c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
52 changes: 36 additions & 16 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,34 @@ func CollectSingleSnapshot(ctx context.Context,
return nil, err
}

defer queue.ShutDown()

d := delta.New(log, clusterID, v.Full())
for queue.Len() > 0 {
i, _ := queue.Get()
di, ok := i.(*delta.Item)
if !ok {
go func() {
for {
i, _ := queue.Get()
if i == nil {
return
}
di, ok := i.(*delta.Item)
if !ok {
queue.Done(i)
log.Errorf("expected queue item to be of type %T but got %T", &delta.Item{}, i)
continue
}

d.Add(di)
queue.Done(i)
log.Errorf("expected queue item to be of type %T but got %T", &delta.Item{}, i)
continue
}
}()

d.Add(di)
queue.Done(i)
err = collectInitialSnapshot(ctx, log, handledInformers, queue, cfg.PrepTimeout)
if err != nil {
return nil, err
}

log.Debugf("synced %d items", len(d.Cache))

return d.ToCASTAIRequest(), nil
}

Expand Down Expand Up @@ -265,7 +279,7 @@ func (c *Controller) Run(ctx context.Context) error {
}()

g.Go(func() error {
if err := c.collectInitialSnapshot(ctx); err != nil {
if err := collectInitialSnapshot(ctx, c.log, c.informers, c.queue, c.cfg.PrepTimeout); err != nil {
const maxItems = 5
queueContent := c.debugQueueContent(maxItems)
log := c.log.WithField("queue_content", queueContent)
Expand Down Expand Up @@ -378,28 +392,34 @@ func startConditionalInformers(ctx context.Context,

// collectInitialSnapshot is used to add a time buffer to collect the initial snapshot which is larger than periodic
// delta because it contains a significant portion of the Kubernetes state.
func (c *Controller) collectInitialSnapshot(ctx context.Context) error {
c.log.Info("collecting initial cluster snapshot")
func collectInitialSnapshot(
ctx context.Context,
log logrus.FieldLogger,
informers map[string]*custominformers.HandledInformer,
queue workqueue.Interface,
prepTimeout time.Duration,
) error {
log.Info("collecting initial cluster snapshot")

startedAt := time.Now()

ctx, cancel := context.WithTimeout(ctx, c.cfg.PrepTimeout)
ctx, cancel := context.WithTimeout(ctx, prepTimeout)
defer cancel()

// Collect initial state from cached informers and push to deltas queue.
for _, informer := range c.informers {
for _, informer := range informers {
for _, item := range informer.GetStore().List() {
informer.Handler.OnAdd(item, true)
}
}

cond := func() (done bool, err error) {
queueLen := c.queue.Len()
log := c.log.WithField("queue_length", queueLen)
queueLen := queue.Len()
log := log.WithField("queue_length", queueLen)
log.Debug("waiting until initial queue empty")

if queueLen == 0 {
c.log.Infof("done waiting for initial cluster snapshot collection after %v", time.Since(startedAt))
log.Infof("done waiting for initial cluster snapshot collection after %v", time.Since(startedAt))
return true, nil
}

Expand Down
4 changes: 3 additions & 1 deletion internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,9 @@ func TestCollectSingleSnapshot(t *testing.T) {
clientset,
dynamic_fake.NewSimpleDynamicClient(runtime.NewScheme()),
metrics_fake.NewSimpleClientset(),
&config.Controller{},
&config.Controller{
PrepTimeout: 10 * time.Second,
},
version,
"",
)
Expand Down

0 comments on commit 4bbc19c

Please sign in to comment.