From 20047e7f23bb6089c3cfc69715f0cc90fc022ad2 Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Tue, 15 Nov 2022 06:22:56 +0200 Subject: [PATCH 1/9] custom gracefulshutdown per org --- depot/containerstore/containerstore.go | 8 ++++---- depot/steps/run_step.go | 14 ++++++++++++-- depot/transformer/transformer.go | 5 +++++ initializer/initializer.go | 4 ++++ 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/depot/containerstore/containerstore.go b/depot/containerstore/containerstore.go index 0a0e1dd0..62bca770 100644 --- a/depot/containerstore/containerstore.go +++ b/depot/containerstore/containerstore.go @@ -56,10 +56,10 @@ type ContainerConfig struct { MaxCPUShares uint64 SetCPUWeight bool - ReservedExpirationTime time.Duration - ReapInterval time.Duration - MaxLogLinesPerSecond int - MetricReportInterval time.Duration + ReservedExpirationTime time.Duration + ReapInterval time.Duration + MaxLogLinesPerSecond int + MetricReportInterval time.Duration } type containerStore struct { diff --git a/depot/steps/run_step.go b/depot/steps/run_step.go index 62e269ee..59ad8b85 100644 --- a/depot/steps/run_step.go +++ b/depot/steps/run_step.go @@ -28,6 +28,7 @@ type runStep struct { portMappings []executor.PortMapping clock clock.Clock gracefulShutdownInterval time.Duration + gracefulShutDownPerOrg []string suppressExitStatusCode bool sidecar Sidecar } @@ -49,6 +50,7 @@ func NewRun( portMappings []executor.PortMapping, clock clock.Clock, gracefulShutdownInterval time.Duration, + gracefulShutDownPerOrg string[], suppressExitStatusCode bool, ) *runStep { return NewRunWithSidecar( @@ -61,6 +63,7 @@ func NewRun( portMappings, clock, gracefulShutdownInterval, + gracefulShutDownPerOrg, suppressExitStatusCode, Sidecar{}, false, @@ -77,6 +80,7 @@ func NewRunWithSidecar( portMappings []executor.PortMapping, clock clock.Clock, gracefulShutdownInterval time.Duration, + gracefulShutDownPerOrg string[], suppressExitStatusCode bool, sidecar Sidecar, privileged bool, @@ -92,6 +96,7 @@ func NewRunWithSidecar( portMappings: portMappings, clock: clock, gracefulShutdownInterval: gracefulShutdownInterval, + gracefulShutDownPerOrg: gracefulShutDownPerOrg, suppressExitStatusCode: suppressExitStatusCode, sidecar: sidecar, } @@ -260,8 +265,13 @@ func (step *runStep) Run(signals <-chan os.Signal, ready chan<- struct{}) error logger.Debug("signalling-terminate-success") signals = nil - - killTimer := step.clock.NewTimer(step.gracefulShutdownInterval) + + grace := step.gracefulShutdownInterval + if hasCustomGraceInterval(step.gracefulShutDownPerOrg, step.container.CertificateProperties.OrganizationalUnit) { + grace = getCustomGraceInterval(step.gracefulShutDownPerOrg, step.container.CertificateProperties.OrganizationalUnit); + } + + killTimer := step.clock.NewTimer(grace) defer killTimer.Stop() killSwitch = killTimer.C() diff --git a/depot/transformer/transformer.go b/depot/transformer/transformer.go index c9be095f..eb4185fb 100644 --- a/depot/transformer/transformer.go +++ b/depot/transformer/transformer.go @@ -60,6 +60,7 @@ type transformer struct { healthyMonitoringInterval time.Duration unhealthyMonitoringInterval time.Duration gracefulShutdownInterval time.Duration + gracefulShutdownPerOrg string[] healthCheckWorkPool *workpool.WorkPool useContainerProxy bool @@ -110,6 +111,7 @@ func NewTransformer( healthyMonitoringInterval time.Duration, unhealthyMonitoringInterval time.Duration, gracefulShutdownInterval time.Duration, + gracefulShutdownPerOrg string[], healthCheckWorkPool *workpool.WorkPool, opts ...Option, ) *transformer { @@ -123,6 +125,7 @@ func NewTransformer( healthyMonitoringInterval: healthyMonitoringInterval, unhealthyMonitoringInterval: unhealthyMonitoringInterval, gracefulShutdownInterval: gracefulShutdownInterval, + gracefulShutdownPerOrg: gracefulShutdownPerOrg, healthCheckWorkPool: healthCheckWorkPool, clock: clock, } @@ -158,6 +161,7 @@ func (t *transformer) stepFor( ports, t.clock, t.gracefulShutdownInterval, + t.gracefulShutDownPerOrg, suppressExitStatusCode, ) @@ -398,6 +402,7 @@ func (t *transformer) StepsRunner( container.Ports, t.clock, t.gracefulShutdownInterval, + t.gracefulShutdownPerOrg, suppressExitStatusCode, ) } diff --git a/initializer/initializer.go b/initializer/initializer.go index 0db2a54e..35e9ef42 100644 --- a/initializer/initializer.go +++ b/initializer/initializer.go @@ -149,6 +149,7 @@ type ExecutorConfig struct { UnhealthyMonitoringInterval durationjson.Duration `json:"unhealthy_monitoring_interval,omitempty"` UseSchedulableDiskSize bool `json:"use_schedulable_disk_size,omitempty"` VolmanDriverPaths string `json:"volman_driver_paths"` + GracefulShutdownIntervalPerOrg string[] `json:"GracefulShutdownIntervalPerOrg,omitempty"` } var ( @@ -250,6 +251,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string, gardenHealthcheckRootFS, config.EnableContainerProxy, time.Duration(config.EnvoyDrainTimeout), + config.GracefulShutdownIntervalPerOrg ) hub := event.NewHub() @@ -544,6 +546,7 @@ func initializeTransformer( declarativeHealthcheckRootFS string, enableContainerProxy bool, drainWait time.Duration, + gracefulShutDownPerOrg []string, ) transformer.Transformer { var options []transformer.Option compressor := compressor.NewTgz() @@ -571,6 +574,7 @@ func initializeTransformer( healthyMonitoringInterval, unhealthyMonitoringInterval, gracefulShutdownInterval, + gracefulShutDownPerOrg, healthCheckWorkPool, options..., ) From a3a8b20be109ffa6839e167999719f6f34f59e72 Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Sat, 3 Dec 2022 05:04:53 +0200 Subject: [PATCH 2/9] fix build --- depot/steps/run_step.go | 88 ++++++++++-------- depot/transformer/transformer.go | 152 +++++++++++++------------------ initializer/initializer.go | 8 +- 3 files changed, 120 insertions(+), 128 deletions(-) diff --git a/depot/steps/run_step.go b/depot/steps/run_step.go index 59ad8b85..553a2b90 100644 --- a/depot/steps/run_step.go +++ b/depot/steps/run_step.go @@ -19,18 +19,20 @@ import ( const ExitTimeout = 1 * time.Second type runStep struct { - container garden.Container - model models.RunAction - streamer log_streamer.LogStreamer - logger lager.Logger - externalIP string - internalIP string - portMappings []executor.PortMapping - clock clock.Clock - gracefulShutdownInterval time.Duration - gracefulShutDownPerOrg []string - suppressExitStatusCode bool - sidecar Sidecar + container garden.Container + model models.RunAction + streamer log_streamer.LogStreamer + logger lager.Logger + externalIP string + internalIP string + portMappings []executor.PortMapping + certificateProperties executor.CertificateProperties + clock clock.Clock + gracefulShutdownInterval time.Duration + extendedGracefulShutdownInterval time.Duration + gracefulShutDownPerOrg []string + suppressExitStatusCode bool + sidecar Sidecar } type Sidecar struct { @@ -45,12 +47,11 @@ func NewRun( model models.RunAction, streamer log_streamer.LogStreamer, logger lager.Logger, - externalIP string, - internalIP string, - portMappings []executor.PortMapping, + executorContainer executor.Container, clock clock.Clock, gracefulShutdownInterval time.Duration, - gracefulShutDownPerOrg string[], + extendedGracefulShutdownInterval time.Duration, + gracefulShutDownPerOrg []string, suppressExitStatusCode bool, ) *runStep { return NewRunWithSidecar( @@ -58,11 +59,10 @@ func NewRun( model, streamer, logger, - externalIP, - internalIP, - portMappings, + executorContainer, clock, gracefulShutdownInterval, + extendedGracefulShutdownInterval, gracefulShutDownPerOrg, suppressExitStatusCode, Sidecar{}, @@ -75,30 +75,31 @@ func NewRunWithSidecar( model models.RunAction, streamer log_streamer.LogStreamer, logger lager.Logger, - externalIP string, - internalIP string, - portMappings []executor.PortMapping, + executorContainer executor.Container, clock clock.Clock, gracefulShutdownInterval time.Duration, - gracefulShutDownPerOrg string[], + extendedGracefulShutdownInterval time.Duration, + gracefulShutDownPerOrg []string, suppressExitStatusCode bool, sidecar Sidecar, privileged bool, ) *runStep { logger = logger.Session("run-step") return &runStep{ - container: container, - model: model, - streamer: streamer, - logger: logger, - externalIP: externalIP, - internalIP: internalIP, - portMappings: portMappings, - clock: clock, - gracefulShutdownInterval: gracefulShutdownInterval, - gracefulShutDownPerOrg: gracefulShutDownPerOrg, - suppressExitStatusCode: suppressExitStatusCode, - sidecar: sidecar, + container: container, + model: model, + streamer: streamer, + logger: logger, + externalIP: executorContainer.ExternalIP, + internalIP: executorContainer.InternalIP, + portMappings: executorContainer.Ports, + certificateProperties: executorContainer.CertificateProperties, + clock: clock, + gracefulShutdownInterval: gracefulShutdownInterval, + extendedGracefulShutdownInterval: extendedGracefulShutdownInterval, + gracefulShutDownPerOrg: gracefulShutDownPerOrg, + suppressExitStatusCode: suppressExitStatusCode, + sidecar: sidecar, } } @@ -265,12 +266,12 @@ func (step *runStep) Run(signals <-chan os.Signal, ready chan<- struct{}) error logger.Debug("signalling-terminate-success") signals = nil - + grace := step.gracefulShutdownInterval - if hasCustomGraceInterval(step.gracefulShutDownPerOrg, step.container.CertificateProperties.OrganizationalUnit) { - grace = getCustomGraceInterval(step.gracefulShutDownPerOrg, step.container.CertificateProperties.OrganizationalUnit); + if stringInSlice(step.certificateProperties.OrganizationalUnit[0], step.gracefulShutDownPerOrg) { + grace = step.extendedGracefulShutdownInterval } - + killTimer := step.clock.NewTimer(grace) defer killTimer.Stop() @@ -359,3 +360,12 @@ func (step *runStep) networkingEnvVars() []string { return envVars } + +func stringInSlice(a string, list []string) bool { + for _, b := range list { + if ("organization:" + b) == a { + return true + } + } + return false +} diff --git a/depot/transformer/transformer.go b/depot/transformer/transformer.go index eb4185fb..6d1ca5cf 100644 --- a/depot/transformer/transformer.go +++ b/depot/transformer/transformer.go @@ -55,13 +55,14 @@ type transformer struct { tempDir string clock clock.Clock - sidecarRootFS string - useDeclarativeHealthCheck bool - healthyMonitoringInterval time.Duration - unhealthyMonitoringInterval time.Duration - gracefulShutdownInterval time.Duration - gracefulShutdownPerOrg string[] - healthCheckWorkPool *workpool.WorkPool + sidecarRootFS string + useDeclarativeHealthCheck bool + healthyMonitoringInterval time.Duration + unhealthyMonitoringInterval time.Duration + gracefulShutdownInterval time.Duration + extendedGracefulShutdownInterval time.Duration + gracefulShutDownPerOrg []string + healthCheckWorkPool *workpool.WorkPool useContainerProxy bool drainWait time.Duration @@ -111,23 +112,25 @@ func NewTransformer( healthyMonitoringInterval time.Duration, unhealthyMonitoringInterval time.Duration, gracefulShutdownInterval time.Duration, - gracefulShutdownPerOrg string[], + extendedGracefulShutdownInterval time.Duration, + gracefulShutDownPerOrg []string, healthCheckWorkPool *workpool.WorkPool, opts ...Option, ) *transformer { t := &transformer{ - cachedDownloader: cachedDownloader, - uploader: uploader, - compressor: compressor, - downloadLimiter: downloadLimiter, - uploadLimiter: uploadLimiter, - tempDir: tempDir, - healthyMonitoringInterval: healthyMonitoringInterval, - unhealthyMonitoringInterval: unhealthyMonitoringInterval, - gracefulShutdownInterval: gracefulShutdownInterval, - gracefulShutdownPerOrg: gracefulShutdownPerOrg, - healthCheckWorkPool: healthCheckWorkPool, - clock: clock, + cachedDownloader: cachedDownloader, + uploader: uploader, + compressor: compressor, + downloadLimiter: downloadLimiter, + uploadLimiter: uploadLimiter, + tempDir: tempDir, + healthyMonitoringInterval: healthyMonitoringInterval, + unhealthyMonitoringInterval: unhealthyMonitoringInterval, + gracefulShutdownInterval: gracefulShutdownInterval, + extendedGracefulShutdownInterval: extendedGracefulShutdownInterval, + gracefulShutDownPerOrg: gracefulShutDownPerOrg, + healthCheckWorkPool: healthCheckWorkPool, + clock: clock, } for _, o := range opts { @@ -141,9 +144,10 @@ func (t *transformer) stepFor( logStreamer log_streamer.LogStreamer, action *models.Action, container garden.Container, - externalIP string, - internalIP string, - ports []executor.PortMapping, + executorContainer executor.Container, + // externalIP string, + // internalIP string, + // ports []executor.PortMapping, suppressExitStatusCode bool, monitorOutputWrapper bool, logger lager.Logger, @@ -156,11 +160,10 @@ func (t *transformer) stepFor( *actionModel, logStreamer.WithSource(actionModel.LogSource), logger, - externalIP, - internalIP, - ports, + executorContainer, t.clock, t.gracefulShutdownInterval, + t.extendedGracefulShutdownInterval, t.gracefulShutDownPerOrg, suppressExitStatusCode, ) @@ -193,9 +196,7 @@ func (t *transformer) stepFor( logStreamer, actionModel.Action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -213,9 +214,7 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), actionModel.Action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -231,9 +230,7 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), actionModel.Action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -252,9 +249,7 @@ func (t *transformer) stepFor( bufferedLogStreamer, action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -266,9 +261,7 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -289,9 +282,7 @@ func (t *transformer) stepFor( bufferedLogStreamer, action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -303,9 +294,7 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -323,9 +312,7 @@ func (t *transformer) stepFor( logStreamer, action, container, - externalIP, - internalIP, - ports, + executorContainer, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -362,7 +349,7 @@ func overrideSuppressLogOutput(monitorAction *models.Action) { } func (t *transformer) StepsRunner( logger lager.Logger, - container executor.Container, + executorContainer executor.Container, gardenContainer garden.Container, logStreamer log_streamer.LogStreamer, config Config, @@ -370,14 +357,12 @@ func (t *transformer) StepsRunner( var setup, action, postSetup, monitor, longLivedAction ifrit.Runner var substeps []ifrit.Runner - if container.Setup != nil { + if executorContainer.Setup != nil { setup = t.stepFor( logStreamer, - container.Setup, + executorContainer.Setup, gardenContainer, - container.ExternalIP, - container.InternalIP, - container.Ports, + executorContainer, false, false, logger.Session("setup"), @@ -397,17 +382,16 @@ func (t *transformer) StepsRunner( actionModel, log_streamer.NewNoopStreamer(), logger.Session("post-setup"), - container.ExternalIP, - container.InternalIP, - container.Ports, + executorContainer, t.clock, t.gracefulShutdownInterval, - t.gracefulShutdownPerOrg, + t.extendedGracefulShutdownInterval, + t.gracefulShutDownPerOrg, suppressExitStatusCode, ) } - if container.Action == nil { + if executorContainer.Action == nil { err := errors.New("container cannot have empty action") logger.Error("steps-runner-empty-action", err) return nil, err @@ -415,11 +399,9 @@ func (t *transformer) StepsRunner( action = t.stepFor( logStreamer, - container.Action, + executorContainer.Action, gardenContainer, - container.ExternalIP, - container.InternalIP, - container.Ports, + executorContainer, false, false, logger.Session("action"), @@ -427,13 +409,11 @@ func (t *transformer) StepsRunner( substeps = append(substeps, action) - for _, sidecar := range container.Sidecars { + for _, sidecar := range executorContainer.Sidecars { substeps = append(substeps, t.stepFor(logStreamer, sidecar.Action, gardenContainer, - container.ExternalIP, - container.InternalIP, - container.Ports, + executorContainer, false, false, logger.Session("sidecar"), @@ -450,7 +430,7 @@ func (t *transformer) StepsRunner( readinessSidecarName := fmt.Sprintf("%s-envoy-readiness-healthcheck-%d", gardenContainer.Handle(), idx) step := t.createCheck( - &container, + &executorContainer, gardenContainer, config.BindMounts, "", @@ -467,26 +447,24 @@ func (t *transformer) StepsRunner( } } - if container.CheckDefinition != nil && t.useDeclarativeHealthCheck { + if executorContainer.CheckDefinition != nil && t.useDeclarativeHealthCheck { monitor = t.transformCheckDefinition(logger, - &container, + &executorContainer, gardenContainer, logStreamer, config.BindMounts, proxyReadinessChecks, ) substeps = append(substeps, monitor) - } else if container.Monitor != nil { - overrideSuppressLogOutput(container.Monitor) + } else if executorContainer.Monitor != nil { + overrideSuppressLogOutput(executorContainer.Monitor) monitor = steps.NewMonitor( func() ifrit.Runner { return t.stepFor( logStreamer, - container.Monitor, + executorContainer.Monitor, gardenContainer, - container.ExternalIP, - container.InternalIP, - container.Ports, + executorContainer, true, true, logger.Session("monitor-run"), @@ -495,7 +473,7 @@ func (t *transformer) StepsRunner( logger.Session("monitor"), t.clock, logStreamer, - time.Duration(container.StartTimeoutMs)*time.Millisecond, + time.Duration(executorContainer.StartTimeoutMs)*time.Millisecond, t.healthyMonitoringInterval, t.unhealthyMonitoringInterval, t.healthCheckWorkPool, @@ -510,10 +488,10 @@ func (t *transformer) StepsRunner( longLivedAction = action } - if t.useContainerProxy && container.EnableContainerProxy { + if t.useContainerProxy && executorContainer.EnableContainerProxy { containerProxyStep := t.transformContainerProxyStep( gardenContainer, - container, + executorContainer, logger, logStreamer, config.BindMounts, @@ -594,11 +572,11 @@ func (t *transformer) createCheck( runAction, bufferedLogStreamer, logger, - container.ExternalIP, - container.InternalIP, - container.Ports, + *container, t.clock, t.gracefulShutdownInterval, + t.extendedGracefulShutdownInterval, + t.gracefulShutDownPerOrg, true, sidecar, container.Privileged, @@ -759,11 +737,11 @@ func (t *transformer) transformContainerProxyStep( runAction, streamer.WithSource("PROXY"), proxyLogger, - execContainer.ExternalIP, - execContainer.InternalIP, - execContainer.Ports, + execContainer, t.clock, t.gracefulShutdownInterval, + t.extendedGracefulShutdownInterval, + t.gracefulShutDownPerOrg, false, sidecar, execContainer.Privileged, diff --git a/initializer/initializer.go b/initializer/initializer.go index 35e9ef42..e34fd2ae 100644 --- a/initializer/initializer.go +++ b/initializer/initializer.go @@ -120,6 +120,7 @@ type ExecutorConfig struct { GardenHealthcheckTimeout durationjson.Duration `json:"garden_healthcheck_timeout,omitempty"` GardenNetwork string `json:"garden_network,omitempty"` GracefulShutdownInterval durationjson.Duration `json:"graceful_shutdown_interval,omitempty"` + ExtendedGracefulShutdownInterval durationjson.Duration `json:"extended_graceful_shutdown_interval,omitempty"` HealthCheckContainerOwnerName string `json:"healthcheck_container_owner_name,omitempty"` HealthCheckWorkPoolSize int `json:"healthcheck_work_pool_size,omitempty"` HealthyMonitoringInterval durationjson.Duration `json:"healthy_monitoring_interval,omitempty"` @@ -149,7 +150,7 @@ type ExecutorConfig struct { UnhealthyMonitoringInterval durationjson.Duration `json:"unhealthy_monitoring_interval,omitempty"` UseSchedulableDiskSize bool `json:"use_schedulable_disk_size,omitempty"` VolmanDriverPaths string `json:"volman_driver_paths"` - GracefulShutdownIntervalPerOrg string[] `json:"GracefulShutdownIntervalPerOrg,omitempty"` + GracefulShutdownIntervalPerOrg []string `json:"extended_graceful_shutdown_orgs,omitempty"` } var ( @@ -243,6 +244,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string, time.Duration(config.HealthyMonitoringInterval), time.Duration(config.UnhealthyMonitoringInterval), time.Duration(config.GracefulShutdownInterval), + time.Duration(config.ExtendedGracefulShutdownInterval), healthCheckWorkPool, clock, postSetupHook, @@ -251,7 +253,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string, gardenHealthcheckRootFS, config.EnableContainerProxy, time.Duration(config.EnvoyDrainTimeout), - config.GracefulShutdownIntervalPerOrg + config.GracefulShutdownIntervalPerOrg, ) hub := event.NewHub() @@ -538,6 +540,7 @@ func initializeTransformer( healthyMonitoringInterval time.Duration, unhealthyMonitoringInterval time.Duration, gracefulShutdownInterval time.Duration, + extendedGracefulShutdownInterval time.Duration, healthCheckWorkPool *workpool.WorkPool, clock clock.Clock, postSetupHook []string, @@ -574,6 +577,7 @@ func initializeTransformer( healthyMonitoringInterval, unhealthyMonitoringInterval, gracefulShutdownInterval, + extendedGracefulShutdownInterval, gracefulShutDownPerOrg, healthCheckWorkPool, options..., From d3809ed9eea39f42dd83e57bf2bf075dbdc45bec Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Sat, 3 Dec 2022 05:38:41 +0200 Subject: [PATCH 3/9] fix tests --- depot/steps/run_step.go | 3 +- depot/steps/run_step_test.go | 54 ++++++++++++++++----------- depot/transformer/transformer_test.go | 32 +++++++++------- 3 files changed, 53 insertions(+), 36 deletions(-) diff --git a/depot/steps/run_step.go b/depot/steps/run_step.go index 553a2b90..d78e9a70 100644 --- a/depot/steps/run_step.go +++ b/depot/steps/run_step.go @@ -268,7 +268,8 @@ func (step *runStep) Run(signals <-chan os.Signal, ready chan<- struct{}) error signals = nil grace := step.gracefulShutdownInterval - if stringInSlice(step.certificateProperties.OrganizationalUnit[0], step.gracefulShutDownPerOrg) { + if len(step.certificateProperties.OrganizationalUnit) > 0 && + stringInSlice(step.certificateProperties.OrganizationalUnit[0], step.gracefulShutDownPerOrg) { grace = step.extendedGracefulShutdownInterval } diff --git a/depot/steps/run_step_test.go b/depot/steps/run_step_test.go index 02a7634b..7b58c25b 100644 --- a/depot/steps/run_step_test.go +++ b/depot/steps/run_step_test.go @@ -32,17 +32,20 @@ var _ = Describe("RunAction", func() { gardenClient *fakes.FakeGardenClient logger *lagertest.TestLogger fileDescriptorLimit, processesLimit uint64 - externalIP, internalIP string - portMappings []executor.PortMapping - fakeClock *fakeclock.FakeClock - suppressExitStatusCode bool - - spawnedProcess *gardenfakes.FakeProcess - runError error - testLogSource string - sidecar steps.Sidecar - privileged bool - gracefulShutdownInterval time.Duration = 5 * time.Second + // externalIP, internalIP string + //portMappings []executor.PortMapping + fakeClock *fakeclock.FakeClock + suppressExitStatusCode bool + + spawnedProcess *gardenfakes.FakeProcess + runError error + testLogSource string + sidecar steps.Sidecar + privileged bool + gracefulShutdownInterval time.Duration = 5 * time.Second + extendedGracefulShutdownInterval time.Duration = 10 * time.Second + gracefulShutDownPerOrg []string + executorContainer executor.Container ) BeforeEach(func() { @@ -82,10 +85,17 @@ var _ = Describe("RunAction", func() { gardenClient.Connection.RunStub = func(string, garden.ProcessSpec, garden.ProcessIO) (garden.Process, error) { return spawnedProcess, runError } - - externalIP = "external-ip" - internalIP = "internal-ip" - portMappings = nil + executorContainer = executor.Container{ + ExternalIP: "external-ip", + InternalIP: "internal-ip", + } + executorContainer.Ports = nil + executorContainer.CertificateProperties = executor.CertificateProperties{ + OrganizationalUnit: []string{"test_org"}, + } + // externalIP = "external-ip" + // internalIP = "internal-ip" + // portMappings = nil fakeClock = fakeclock.NewFakeClock(time.Unix(123, 456)) }) @@ -102,11 +112,11 @@ var _ = Describe("RunAction", func() { runAction, fakeStreamer, logger, - externalIP, - internalIP, - portMappings, + executorContainer, fakeClock, gracefulShutdownInterval, + extendedGracefulShutdownInterval, + gracefulShutDownPerOrg, suppressExitStatusCode, sidecar, privileged, @@ -293,7 +303,7 @@ var _ = Describe("RunAction", func() { Context("when the container has port mappings configured", func() { BeforeEach(func() { - portMappings = []executor.PortMapping{ + executorContainer.Ports = []executor.PortMapping{ {HostPort: 1, ContainerPort: 2}, {HostPort: 3, ContainerPort: 4}, } @@ -316,7 +326,7 @@ var _ = Describe("RunAction", func() { Context("and a container proxy is enabled", func() { BeforeEach(func() { - portMappings = []executor.PortMapping{ + executorContainer.Ports = []executor.PortMapping{ {HostPort: 1, ContainerPort: 2, ContainerTLSProxyPort: 5, HostTLSProxyPort: 6}, {HostPort: 3, ContainerPort: 4, ContainerTLSProxyPort: 7, HostTLSProxyPort: 8}, } @@ -356,7 +366,7 @@ var _ = Describe("RunAction", func() { Context("and unproxied ports are disabled", func() { BeforeEach(func() { - portMappings = []executor.PortMapping{ + executorContainer.Ports = []executor.PortMapping{ {HostPort: 0, ContainerPort: 2, ContainerTLSProxyPort: 5, HostTLSProxyPort: 6}, } }) @@ -396,7 +406,7 @@ var _ = Describe("RunAction", func() { Context("when the container does not have any port mappings configured", func() { BeforeEach(func() { - portMappings = []executor.PortMapping{} + executorContainer.Ports = []executor.PortMapping{} }) It("sets all port-related env vars to the empty string", func() { diff --git a/depot/transformer/transformer_test.go b/depot/transformer/transformer_test.go index 75f0887c..cee8f695 100644 --- a/depot/transformer/transformer_test.go +++ b/depot/transformer/transformer_test.go @@ -34,19 +34,21 @@ import ( var _ = Describe("Transformer", func() { Describe("StepsRunner", func() { var ( - logger lager.Logger - optimusPrime transformer.Transformer - container executor.Container - logStreamer log_streamer.LogStreamer - gardenContainer *gardenfakes.FakeContainer - clock *fakeclock.FakeClock - fakeMetronClient *mfakes.FakeIngressClient - healthyMonitoringInterval time.Duration - unhealthyMonitoringInterval time.Duration - gracefulShutdownInterval time.Duration - healthCheckWorkPool *workpool.WorkPool - cfg transformer.Config - options []transformer.Option + logger lager.Logger + optimusPrime transformer.Transformer + container executor.Container + logStreamer log_streamer.LogStreamer + gardenContainer *gardenfakes.FakeContainer + clock *fakeclock.FakeClock + fakeMetronClient *mfakes.FakeIngressClient + healthyMonitoringInterval time.Duration + unhealthyMonitoringInterval time.Duration + gracefulShutdownInterval time.Duration + extendedGracefulShutdownInterval time.Duration + gracefulShutDownPerOrg []string + healthCheckWorkPool *workpool.WorkPool + cfg transformer.Config + options []transformer.Option ) BeforeEach(func() { @@ -61,6 +63,8 @@ var _ = Describe("Transformer", func() { healthyMonitoringInterval = 1 * time.Second unhealthyMonitoringInterval = 1 * time.Millisecond gracefulShutdownInterval = 10 * time.Second + extendedGracefulShutdownInterval = 20 * time.Second + gracefulShutDownPerOrg = []string{"test_org"} var err error healthCheckWorkPool, err = workpool.NewWorkPool(10) @@ -113,6 +117,8 @@ var _ = Describe("Transformer", func() { healthyMonitoringInterval, unhealthyMonitoringInterval, gracefulShutdownInterval, + extendedGracefulShutdownInterval, + gracefulShutDownPerOrg, healthCheckWorkPool, options..., ) From 7e198cf545caa14e083eabb964165fadfcca1739 Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Mon, 5 Dec 2022 23:18:06 +0200 Subject: [PATCH 4/9] add test --- depot/steps/run_step_test.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/depot/steps/run_step_test.go b/depot/steps/run_step_test.go index 7b58c25b..986ca87d 100644 --- a/depot/steps/run_step_test.go +++ b/depot/steps/run_step_test.go @@ -91,8 +91,9 @@ var _ = Describe("RunAction", func() { } executorContainer.Ports = nil executorContainer.CertificateProperties = executor.CertificateProperties{ - OrganizationalUnit: []string{"test_org"}, + OrganizationalUnit: []string{"organization:extended_test_org"}, } + gracefulShutDownPerOrg = []string{"test_org"} // externalIP = "external-ip" // internalIP = "internal-ip" // portMappings = nil @@ -759,6 +760,23 @@ var _ = Describe("RunAction", func() { Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 5s graceful shutdown interval\\)")) }) + Context("when the org listed as extended shutdown org", func() { + BeforeEach(func() { + gracefulShutDownPerOrg = []string{"some_org", "extended_test_org"} + gracefulShutdownInterval = extendedGracefulShutdownInterval + }) + AfterEach(func() { + gracefulShutdownInterval = 5 + gracefulShutDownPerOrg = []string{"test_org"} + + }) + It("handles properly extended graceful shutdown", func() { + waitExited <- (128 + 9) + Eventually(fakeStreamer.StdoutCallCount).Should(Equal(2)) + Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 10s graceful shutdown interval\\)")) + }) + }) + Context("when the process *still* does not exit after 1m", func() { It("finishes running with failure", func() { fakeClock.WaitForWatcherAndIncrement(steps.ExitTimeout / 2) From 370934a6a10e2ff9df86e956cdf9345d34e82cf1 Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Mon, 5 Dec 2022 23:20:22 +0200 Subject: [PATCH 5/9] change name --- depot/steps/run_step.go | 12 ++++++------ depot/steps/run_step_test.go | 10 +++++----- depot/transformer/transformer.go | 14 +++++++------- initializer/initializer.go | 4 ++-- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/depot/steps/run_step.go b/depot/steps/run_step.go index d78e9a70..54ae59a7 100644 --- a/depot/steps/run_step.go +++ b/depot/steps/run_step.go @@ -30,7 +30,7 @@ type runStep struct { clock clock.Clock gracefulShutdownInterval time.Duration extendedGracefulShutdownInterval time.Duration - gracefulShutDownPerOrg []string + extendedGracefulShutDownOrgs []string suppressExitStatusCode bool sidecar Sidecar } @@ -51,7 +51,7 @@ func NewRun( clock clock.Clock, gracefulShutdownInterval time.Duration, extendedGracefulShutdownInterval time.Duration, - gracefulShutDownPerOrg []string, + extendedGracefulShutDownOrgs []string, suppressExitStatusCode bool, ) *runStep { return NewRunWithSidecar( @@ -63,7 +63,7 @@ func NewRun( clock, gracefulShutdownInterval, extendedGracefulShutdownInterval, - gracefulShutDownPerOrg, + extendedGracefulShutDownOrgs, suppressExitStatusCode, Sidecar{}, false, @@ -79,7 +79,7 @@ func NewRunWithSidecar( clock clock.Clock, gracefulShutdownInterval time.Duration, extendedGracefulShutdownInterval time.Duration, - gracefulShutDownPerOrg []string, + extendedGracefulShutDownOrgs []string, suppressExitStatusCode bool, sidecar Sidecar, privileged bool, @@ -97,7 +97,7 @@ func NewRunWithSidecar( clock: clock, gracefulShutdownInterval: gracefulShutdownInterval, extendedGracefulShutdownInterval: extendedGracefulShutdownInterval, - gracefulShutDownPerOrg: gracefulShutDownPerOrg, + extendedGracefulShutDownOrgs: extendedGracefulShutDownOrgs, suppressExitStatusCode: suppressExitStatusCode, sidecar: sidecar, } @@ -269,7 +269,7 @@ func (step *runStep) Run(signals <-chan os.Signal, ready chan<- struct{}) error grace := step.gracefulShutdownInterval if len(step.certificateProperties.OrganizationalUnit) > 0 && - stringInSlice(step.certificateProperties.OrganizationalUnit[0], step.gracefulShutDownPerOrg) { + stringInSlice(step.certificateProperties.OrganizationalUnit[0], step.extendedGracefulShutDownOrgs) { grace = step.extendedGracefulShutdownInterval } diff --git a/depot/steps/run_step_test.go b/depot/steps/run_step_test.go index 986ca87d..6135e637 100644 --- a/depot/steps/run_step_test.go +++ b/depot/steps/run_step_test.go @@ -44,7 +44,7 @@ var _ = Describe("RunAction", func() { privileged bool gracefulShutdownInterval time.Duration = 5 * time.Second extendedGracefulShutdownInterval time.Duration = 10 * time.Second - gracefulShutDownPerOrg []string + extendedGracefulShutDownOrgs []string executorContainer executor.Container ) @@ -93,7 +93,7 @@ var _ = Describe("RunAction", func() { executorContainer.CertificateProperties = executor.CertificateProperties{ OrganizationalUnit: []string{"organization:extended_test_org"}, } - gracefulShutDownPerOrg = []string{"test_org"} + extendedGracefulShutDownOrgs = []string{"test_org"} // externalIP = "external-ip" // internalIP = "internal-ip" // portMappings = nil @@ -117,7 +117,7 @@ var _ = Describe("RunAction", func() { fakeClock, gracefulShutdownInterval, extendedGracefulShutdownInterval, - gracefulShutDownPerOrg, + extendedGracefulShutDownOrgs, suppressExitStatusCode, sidecar, privileged, @@ -762,12 +762,12 @@ var _ = Describe("RunAction", func() { Context("when the org listed as extended shutdown org", func() { BeforeEach(func() { - gracefulShutDownPerOrg = []string{"some_org", "extended_test_org"} + extendedGracefulShutDownOrgs = []string{"some_org", "extended_test_org"} gracefulShutdownInterval = extendedGracefulShutdownInterval }) AfterEach(func() { gracefulShutdownInterval = 5 - gracefulShutDownPerOrg = []string{"test_org"} + extendedGracefulShutDownOrgs = []string{"test_org"} }) It("handles properly extended graceful shutdown", func() { diff --git a/depot/transformer/transformer.go b/depot/transformer/transformer.go index 6d1ca5cf..c8350499 100644 --- a/depot/transformer/transformer.go +++ b/depot/transformer/transformer.go @@ -61,7 +61,7 @@ type transformer struct { unhealthyMonitoringInterval time.Duration gracefulShutdownInterval time.Duration extendedGracefulShutdownInterval time.Duration - gracefulShutDownPerOrg []string + extendedGracefulShutDownOrgs []string healthCheckWorkPool *workpool.WorkPool useContainerProxy bool @@ -113,7 +113,7 @@ func NewTransformer( unhealthyMonitoringInterval time.Duration, gracefulShutdownInterval time.Duration, extendedGracefulShutdownInterval time.Duration, - gracefulShutDownPerOrg []string, + extendedGracefulShutDownOrgs []string, healthCheckWorkPool *workpool.WorkPool, opts ...Option, ) *transformer { @@ -128,7 +128,7 @@ func NewTransformer( unhealthyMonitoringInterval: unhealthyMonitoringInterval, gracefulShutdownInterval: gracefulShutdownInterval, extendedGracefulShutdownInterval: extendedGracefulShutdownInterval, - gracefulShutDownPerOrg: gracefulShutDownPerOrg, + extendedGracefulShutDownOrgs: extendedGracefulShutDownOrgs, healthCheckWorkPool: healthCheckWorkPool, clock: clock, } @@ -164,7 +164,7 @@ func (t *transformer) stepFor( t.clock, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval, - t.gracefulShutDownPerOrg, + t.extendedGracefulShutDownOrgs, suppressExitStatusCode, ) @@ -386,7 +386,7 @@ func (t *transformer) StepsRunner( t.clock, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval, - t.gracefulShutDownPerOrg, + t.extendedGracefulShutDownOrgs, suppressExitStatusCode, ) } @@ -576,7 +576,7 @@ func (t *transformer) createCheck( t.clock, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval, - t.gracefulShutDownPerOrg, + t.extendedGracefulShutDownOrgs, true, sidecar, container.Privileged, @@ -741,7 +741,7 @@ func (t *transformer) transformContainerProxyStep( t.clock, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval, - t.gracefulShutDownPerOrg, + t.extendedGracefulShutDownOrgs, false, sidecar, execContainer.Privileged, diff --git a/initializer/initializer.go b/initializer/initializer.go index e34fd2ae..d2446146 100644 --- a/initializer/initializer.go +++ b/initializer/initializer.go @@ -549,7 +549,7 @@ func initializeTransformer( declarativeHealthcheckRootFS string, enableContainerProxy bool, drainWait time.Duration, - gracefulShutDownPerOrg []string, + extendedGracefulShutDownOrgs []string, ) transformer.Transformer { var options []transformer.Option compressor := compressor.NewTgz() @@ -578,7 +578,7 @@ func initializeTransformer( unhealthyMonitoringInterval, gracefulShutdownInterval, extendedGracefulShutdownInterval, - gracefulShutDownPerOrg, + extendedGracefulShutDownOrgs, healthCheckWorkPool, options..., ) From 41fd20582fc514e1397b2571f52a9eec26ae05ac Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Mon, 5 Dec 2022 23:53:05 +0200 Subject: [PATCH 6/9] fix name --- initializer/initializer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/initializer/initializer.go b/initializer/initializer.go index d2446146..83d76494 100644 --- a/initializer/initializer.go +++ b/initializer/initializer.go @@ -150,7 +150,7 @@ type ExecutorConfig struct { UnhealthyMonitoringInterval durationjson.Duration `json:"unhealthy_monitoring_interval,omitempty"` UseSchedulableDiskSize bool `json:"use_schedulable_disk_size,omitempty"` VolmanDriverPaths string `json:"volman_driver_paths"` - GracefulShutdownIntervalPerOrg []string `json:"extended_graceful_shutdown_orgs,omitempty"` + ExtendedGracefulShutdownOrgs []string `json:"extended_graceful_shutdown_orgs,omitempty"` } var ( From 48c6e30ff0f058974f8259caa6fdf8266475902f Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Tue, 6 Dec 2022 07:40:34 +0200 Subject: [PATCH 7/9] fix --- initializer/initializer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/initializer/initializer.go b/initializer/initializer.go index 83d76494..4e3cf0f9 100644 --- a/initializer/initializer.go +++ b/initializer/initializer.go @@ -253,7 +253,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string, gardenHealthcheckRootFS, config.EnableContainerProxy, time.Duration(config.EnvoyDrainTimeout), - config.GracefulShutdownIntervalPerOrg, + config.ExtendedGracefulShutdownOrgs, ) hub := event.NewHub() From 543e582e00fca5a3acbac29d1f7be2f9f86ed1bc Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Wed, 7 Dec 2022 23:30:20 +0200 Subject: [PATCH 8/9] refactor --- depot/steps/run_step.go | 85 +++++++----------- depot/transformer/transformer.go | 144 +++++++++++++++++++++---------- 2 files changed, 129 insertions(+), 100 deletions(-) diff --git a/depot/steps/run_step.go b/depot/steps/run_step.go index 54ae59a7..62e269ee 100644 --- a/depot/steps/run_step.go +++ b/depot/steps/run_step.go @@ -19,20 +19,17 @@ import ( const ExitTimeout = 1 * time.Second type runStep struct { - container garden.Container - model models.RunAction - streamer log_streamer.LogStreamer - logger lager.Logger - externalIP string - internalIP string - portMappings []executor.PortMapping - certificateProperties executor.CertificateProperties - clock clock.Clock - gracefulShutdownInterval time.Duration - extendedGracefulShutdownInterval time.Duration - extendedGracefulShutDownOrgs []string - suppressExitStatusCode bool - sidecar Sidecar + container garden.Container + model models.RunAction + streamer log_streamer.LogStreamer + logger lager.Logger + externalIP string + internalIP string + portMappings []executor.PortMapping + clock clock.Clock + gracefulShutdownInterval time.Duration + suppressExitStatusCode bool + sidecar Sidecar } type Sidecar struct { @@ -47,11 +44,11 @@ func NewRun( model models.RunAction, streamer log_streamer.LogStreamer, logger lager.Logger, - executorContainer executor.Container, + externalIP string, + internalIP string, + portMappings []executor.PortMapping, clock clock.Clock, gracefulShutdownInterval time.Duration, - extendedGracefulShutdownInterval time.Duration, - extendedGracefulShutDownOrgs []string, suppressExitStatusCode bool, ) *runStep { return NewRunWithSidecar( @@ -59,11 +56,11 @@ func NewRun( model, streamer, logger, - executorContainer, + externalIP, + internalIP, + portMappings, clock, gracefulShutdownInterval, - extendedGracefulShutdownInterval, - extendedGracefulShutDownOrgs, suppressExitStatusCode, Sidecar{}, false, @@ -75,31 +72,28 @@ func NewRunWithSidecar( model models.RunAction, streamer log_streamer.LogStreamer, logger lager.Logger, - executorContainer executor.Container, + externalIP string, + internalIP string, + portMappings []executor.PortMapping, clock clock.Clock, gracefulShutdownInterval time.Duration, - extendedGracefulShutdownInterval time.Duration, - extendedGracefulShutDownOrgs []string, suppressExitStatusCode bool, sidecar Sidecar, privileged bool, ) *runStep { logger = logger.Session("run-step") return &runStep{ - container: container, - model: model, - streamer: streamer, - logger: logger, - externalIP: executorContainer.ExternalIP, - internalIP: executorContainer.InternalIP, - portMappings: executorContainer.Ports, - certificateProperties: executorContainer.CertificateProperties, - clock: clock, - gracefulShutdownInterval: gracefulShutdownInterval, - extendedGracefulShutdownInterval: extendedGracefulShutdownInterval, - extendedGracefulShutDownOrgs: extendedGracefulShutDownOrgs, - suppressExitStatusCode: suppressExitStatusCode, - sidecar: sidecar, + container: container, + model: model, + streamer: streamer, + logger: logger, + externalIP: externalIP, + internalIP: internalIP, + portMappings: portMappings, + clock: clock, + gracefulShutdownInterval: gracefulShutdownInterval, + suppressExitStatusCode: suppressExitStatusCode, + sidecar: sidecar, } } @@ -267,13 +261,7 @@ func (step *runStep) Run(signals <-chan os.Signal, ready chan<- struct{}) error logger.Debug("signalling-terminate-success") signals = nil - grace := step.gracefulShutdownInterval - if len(step.certificateProperties.OrganizationalUnit) > 0 && - stringInSlice(step.certificateProperties.OrganizationalUnit[0], step.extendedGracefulShutDownOrgs) { - grace = step.extendedGracefulShutdownInterval - } - - killTimer := step.clock.NewTimer(grace) + killTimer := step.clock.NewTimer(step.gracefulShutdownInterval) defer killTimer.Stop() killSwitch = killTimer.C() @@ -361,12 +349,3 @@ func (step *runStep) networkingEnvVars() []string { return envVars } - -func stringInSlice(a string, list []string) bool { - for _, b := range list { - if ("organization:" + b) == a { - return true - } - } - return false -} diff --git a/depot/transformer/transformer.go b/depot/transformer/transformer.go index c8350499..1ebde3fd 100644 --- a/depot/transformer/transformer.go +++ b/depot/transformer/transformer.go @@ -144,13 +144,13 @@ func (t *transformer) stepFor( logStreamer log_streamer.LogStreamer, action *models.Action, container garden.Container, - executorContainer executor.Container, - // externalIP string, - // internalIP string, - // ports []executor.PortMapping, + externalIP string, + internalIP string, + ports []executor.PortMapping, suppressExitStatusCode bool, monitorOutputWrapper bool, logger lager.Logger, + grace time.Duration, ) ifrit.Runner { a := action.GetValue() switch actionModel := a.(type) { @@ -160,11 +160,11 @@ func (t *transformer) stepFor( *actionModel, logStreamer.WithSource(actionModel.LogSource), logger, - executorContainer, + externalIP, + internalIP, + ports, t.clock, - t.gracefulShutdownInterval, - t.extendedGracefulShutdownInterval, - t.extendedGracefulShutDownOrgs, + grace, suppressExitStatusCode, ) @@ -196,10 +196,13 @@ func (t *transformer) stepFor( logStreamer, actionModel.Action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ), actionModel.StartMessage, actionModel.SuccessMessage, @@ -214,10 +217,13 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), actionModel.Action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ), time.Duration(actionModel.TimeoutMs)*time.Millisecond, t.clock, @@ -230,10 +236,13 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), actionModel.Action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ), logger, ) @@ -249,10 +258,13 @@ func (t *transformer) stepFor( bufferedLogStreamer, action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ), buffer, ) @@ -261,10 +273,13 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ) } subSteps[i] = subStep @@ -282,7 +297,9 @@ func (t *transformer) stepFor( bufferedLogStreamer, action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, @@ -294,10 +311,13 @@ func (t *transformer) stepFor( logStreamer.WithSource(actionModel.LogSource), action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ) } subSteps[i] = subStep @@ -312,10 +332,13 @@ func (t *transformer) stepFor( logStreamer, action, container, - executorContainer, + externalIP, + internalIP, + ports, suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ) } return steps.NewSerial(subSteps) @@ -349,7 +372,7 @@ func overrideSuppressLogOutput(monitorAction *models.Action) { } func (t *transformer) StepsRunner( logger lager.Logger, - executorContainer executor.Container, + container executor.Container, gardenContainer garden.Container, logStreamer log_streamer.LogStreamer, config Config, @@ -357,15 +380,20 @@ func (t *transformer) StepsRunner( var setup, action, postSetup, monitor, longLivedAction ifrit.Runner var substeps []ifrit.Runner - if executorContainer.Setup != nil { + grace := evalGracefulShutdownInterval(container.CertificateProperties, t.extendedGracefulShutDownOrgs, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval) + + if container.Setup != nil { setup = t.stepFor( logStreamer, - executorContainer.Setup, + container.Setup, gardenContainer, - executorContainer, + container.ExternalIP, + container.InternalIP, + container.Ports, false, false, logger.Session("setup"), + grace, ) } setup = steps.NewTimedStep(logger, setup, config.MetronClient, t.clock, config.CreationStartTime) @@ -382,16 +410,16 @@ func (t *transformer) StepsRunner( actionModel, log_streamer.NewNoopStreamer(), logger.Session("post-setup"), - executorContainer, + container.ExternalIP, + container.InternalIP, + container.Ports, t.clock, - t.gracefulShutdownInterval, - t.extendedGracefulShutdownInterval, - t.extendedGracefulShutDownOrgs, + grace, suppressExitStatusCode, ) } - if executorContainer.Action == nil { + if container.Action == nil { err := errors.New("container cannot have empty action") logger.Error("steps-runner-empty-action", err) return nil, err @@ -399,21 +427,26 @@ func (t *transformer) StepsRunner( action = t.stepFor( logStreamer, - executorContainer.Action, + container.Action, gardenContainer, - executorContainer, + container.ExternalIP, + container.InternalIP, + container.Ports, false, false, logger.Session("action"), + grace, ) substeps = append(substeps, action) - for _, sidecar := range executorContainer.Sidecars { + for _, sidecar := range container.Sidecars { substeps = append(substeps, t.stepFor(logStreamer, sidecar.Action, gardenContainer, - executorContainer, + container.ExternalIP, + container.InternalIP, + container.Ports, false, false, logger.Session("sidecar"), @@ -430,7 +463,7 @@ func (t *transformer) StepsRunner( readinessSidecarName := fmt.Sprintf("%s-envoy-readiness-healthcheck-%d", gardenContainer.Handle(), idx) step := t.createCheck( - &executorContainer, + &container, gardenContainer, config.BindMounts, "", @@ -447,33 +480,36 @@ func (t *transformer) StepsRunner( } } - if executorContainer.CheckDefinition != nil && t.useDeclarativeHealthCheck { + if container.CheckDefinition != nil && t.useDeclarativeHealthCheck { monitor = t.transformCheckDefinition(logger, - &executorContainer, + &container, gardenContainer, logStreamer, config.BindMounts, proxyReadinessChecks, ) substeps = append(substeps, monitor) - } else if executorContainer.Monitor != nil { - overrideSuppressLogOutput(executorContainer.Monitor) + } else if container.Monitor != nil { + overrideSuppressLogOutput(container.Monitor) monitor = steps.NewMonitor( func() ifrit.Runner { return t.stepFor( logStreamer, - executorContainer.Monitor, + container.Monitor, gardenContainer, - executorContainer, + container.ExternalIP, + container.InternalIP, + container.Ports, true, true, logger.Session("monitor-run"), + grace, ) }, logger.Session("monitor"), t.clock, logStreamer, - time.Duration(executorContainer.StartTimeoutMs)*time.Millisecond, + time.Duration(container.StartTimeoutMs)*time.Millisecond, t.healthyMonitoringInterval, t.unhealthyMonitoringInterval, t.healthCheckWorkPool, @@ -488,10 +524,10 @@ func (t *transformer) StepsRunner( longLivedAction = action } - if t.useContainerProxy && executorContainer.EnableContainerProxy { + if t.useContainerProxy && container.EnableContainerProxy { containerProxyStep := t.transformContainerProxyStep( gardenContainer, - executorContainer, + container, logger, logStreamer, config.BindMounts, @@ -560,6 +596,7 @@ func (t *transformer) createCheck( Args: args, } + grace := evalGracefulShutdownInterval(container.CertificateProperties, t.extendedGracefulShutDownOrgs, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval) buffer := log_streamer.NewConcurrentBuffer(bytes.NewBuffer(nil)) bufferedLogStreamer := log_streamer.NewBufferStreamer(buffer, buffer) sidecar := steps.Sidecar{ @@ -572,11 +609,11 @@ func (t *transformer) createCheck( runAction, bufferedLogStreamer, logger, - *container, + container.ExternalIP, + container.InternalIP, + container.Ports, t.clock, - t.gracefulShutdownInterval, - t.extendedGracefulShutdownInterval, - t.extendedGracefulShutDownOrgs, + grace, true, sidecar, container.Privileged, @@ -730,6 +767,7 @@ func (t *transformer) transformContainerProxyStep( Name: fmt.Sprintf("%s-envoy", container.Handle()), } + grace := evalGracefulShutdownInterval(execContainer.CertificateProperties, t.extendedGracefulShutDownOrgs, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval) proxyLogger := logger.Session("proxy") return steps.NewBackground(steps.NewRunWithSidecar( @@ -737,13 +775,25 @@ func (t *transformer) transformContainerProxyStep( runAction, streamer.WithSource("PROXY"), proxyLogger, - execContainer, + execContainer.ExternalIP, + execContainer.InternalIP, + execContainer.Ports, t.clock, - t.gracefulShutdownInterval, - t.extendedGracefulShutdownInterval, - t.extendedGracefulShutDownOrgs, + grace, false, sidecar, execContainer.Privileged, ), proxyLogger) } + +func evalGracefulShutdownInterval(certProps executor.CertificateProperties, ext_orgs []string, + std_grace, ext_grace time.Duration) time.Duration { + if len(certProps.OrganizationalUnit) > 0 { + for _, org := range ext_orgs { + if ("organization:" + org) == certProps.OrganizationalUnit[0] { + return ext_grace + } + } + } + return std_grace +} From 86c7bc155501c1b2c8bb9df4ceaf805ecec25011 Mon Sep 17 00:00:00 2001 From: Vladimir Savchenko Date: Fri, 9 Dec 2022 07:13:57 +0200 Subject: [PATCH 9/9] fix tests --- depot/containerstore/containerstore.go | 8 +-- depot/steps/run_step_test.go | 72 ++++++------------- depot/transformer/transformer.go | 53 +++++++------- depot/transformer/transformer_test.go | 96 +++++++++++++++++++++----- 4 files changed, 130 insertions(+), 99 deletions(-) diff --git a/depot/containerstore/containerstore.go b/depot/containerstore/containerstore.go index 62bca770..0a0e1dd0 100644 --- a/depot/containerstore/containerstore.go +++ b/depot/containerstore/containerstore.go @@ -56,10 +56,10 @@ type ContainerConfig struct { MaxCPUShares uint64 SetCPUWeight bool - ReservedExpirationTime time.Duration - ReapInterval time.Duration - MaxLogLinesPerSecond int - MetricReportInterval time.Duration + ReservedExpirationTime time.Duration + ReapInterval time.Duration + MaxLogLinesPerSecond int + MetricReportInterval time.Duration } type containerStore struct { diff --git a/depot/steps/run_step_test.go b/depot/steps/run_step_test.go index 6135e637..02a7634b 100644 --- a/depot/steps/run_step_test.go +++ b/depot/steps/run_step_test.go @@ -32,20 +32,17 @@ var _ = Describe("RunAction", func() { gardenClient *fakes.FakeGardenClient logger *lagertest.TestLogger fileDescriptorLimit, processesLimit uint64 - // externalIP, internalIP string - //portMappings []executor.PortMapping - fakeClock *fakeclock.FakeClock - suppressExitStatusCode bool - - spawnedProcess *gardenfakes.FakeProcess - runError error - testLogSource string - sidecar steps.Sidecar - privileged bool - gracefulShutdownInterval time.Duration = 5 * time.Second - extendedGracefulShutdownInterval time.Duration = 10 * time.Second - extendedGracefulShutDownOrgs []string - executorContainer executor.Container + externalIP, internalIP string + portMappings []executor.PortMapping + fakeClock *fakeclock.FakeClock + suppressExitStatusCode bool + + spawnedProcess *gardenfakes.FakeProcess + runError error + testLogSource string + sidecar steps.Sidecar + privileged bool + gracefulShutdownInterval time.Duration = 5 * time.Second ) BeforeEach(func() { @@ -85,18 +82,10 @@ var _ = Describe("RunAction", func() { gardenClient.Connection.RunStub = func(string, garden.ProcessSpec, garden.ProcessIO) (garden.Process, error) { return spawnedProcess, runError } - executorContainer = executor.Container{ - ExternalIP: "external-ip", - InternalIP: "internal-ip", - } - executorContainer.Ports = nil - executorContainer.CertificateProperties = executor.CertificateProperties{ - OrganizationalUnit: []string{"organization:extended_test_org"}, - } - extendedGracefulShutDownOrgs = []string{"test_org"} - // externalIP = "external-ip" - // internalIP = "internal-ip" - // portMappings = nil + + externalIP = "external-ip" + internalIP = "internal-ip" + portMappings = nil fakeClock = fakeclock.NewFakeClock(time.Unix(123, 456)) }) @@ -113,11 +102,11 @@ var _ = Describe("RunAction", func() { runAction, fakeStreamer, logger, - executorContainer, + externalIP, + internalIP, + portMappings, fakeClock, gracefulShutdownInterval, - extendedGracefulShutdownInterval, - extendedGracefulShutDownOrgs, suppressExitStatusCode, sidecar, privileged, @@ -304,7 +293,7 @@ var _ = Describe("RunAction", func() { Context("when the container has port mappings configured", func() { BeforeEach(func() { - executorContainer.Ports = []executor.PortMapping{ + portMappings = []executor.PortMapping{ {HostPort: 1, ContainerPort: 2}, {HostPort: 3, ContainerPort: 4}, } @@ -327,7 +316,7 @@ var _ = Describe("RunAction", func() { Context("and a container proxy is enabled", func() { BeforeEach(func() { - executorContainer.Ports = []executor.PortMapping{ + portMappings = []executor.PortMapping{ {HostPort: 1, ContainerPort: 2, ContainerTLSProxyPort: 5, HostTLSProxyPort: 6}, {HostPort: 3, ContainerPort: 4, ContainerTLSProxyPort: 7, HostTLSProxyPort: 8}, } @@ -367,7 +356,7 @@ var _ = Describe("RunAction", func() { Context("and unproxied ports are disabled", func() { BeforeEach(func() { - executorContainer.Ports = []executor.PortMapping{ + portMappings = []executor.PortMapping{ {HostPort: 0, ContainerPort: 2, ContainerTLSProxyPort: 5, HostTLSProxyPort: 6}, } }) @@ -407,7 +396,7 @@ var _ = Describe("RunAction", func() { Context("when the container does not have any port mappings configured", func() { BeforeEach(func() { - executorContainer.Ports = []executor.PortMapping{} + portMappings = []executor.PortMapping{} }) It("sets all port-related env vars to the empty string", func() { @@ -760,23 +749,6 @@ var _ = Describe("RunAction", func() { Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 5s graceful shutdown interval\\)")) }) - Context("when the org listed as extended shutdown org", func() { - BeforeEach(func() { - extendedGracefulShutDownOrgs = []string{"some_org", "extended_test_org"} - gracefulShutdownInterval = extendedGracefulShutdownInterval - }) - AfterEach(func() { - gracefulShutdownInterval = 5 - extendedGracefulShutDownOrgs = []string{"test_org"} - - }) - It("handles properly extended graceful shutdown", func() { - waitExited <- (128 + 9) - Eventually(fakeStreamer.StdoutCallCount).Should(Equal(2)) - Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 10s graceful shutdown interval\\)")) - }) - }) - Context("when the process *still* does not exit after 1m", func() { It("finishes running with failure", func() { fakeClock.WaitForWatcherAndIncrement(steps.ExitTimeout / 2) diff --git a/depot/transformer/transformer.go b/depot/transformer/transformer.go index 1ebde3fd..c181967e 100644 --- a/depot/transformer/transformer.go +++ b/depot/transformer/transformer.go @@ -55,14 +55,14 @@ type transformer struct { tempDir string clock clock.Clock - sidecarRootFS string - useDeclarativeHealthCheck bool - healthyMonitoringInterval time.Duration - unhealthyMonitoringInterval time.Duration - gracefulShutdownInterval time.Duration + sidecarRootFS string + useDeclarativeHealthCheck bool + healthyMonitoringInterval time.Duration + unhealthyMonitoringInterval time.Duration + gracefulShutdownInterval time.Duration extendedGracefulShutdownInterval time.Duration extendedGracefulShutDownOrgs []string - healthCheckWorkPool *workpool.WorkPool + healthCheckWorkPool *workpool.WorkPool useContainerProxy bool drainWait time.Duration @@ -118,19 +118,19 @@ func NewTransformer( opts ...Option, ) *transformer { t := &transformer{ - cachedDownloader: cachedDownloader, - uploader: uploader, - compressor: compressor, - downloadLimiter: downloadLimiter, - uploadLimiter: uploadLimiter, - tempDir: tempDir, - healthyMonitoringInterval: healthyMonitoringInterval, - unhealthyMonitoringInterval: unhealthyMonitoringInterval, - gracefulShutdownInterval: gracefulShutdownInterval, + cachedDownloader: cachedDownloader, + uploader: uploader, + compressor: compressor, + downloadLimiter: downloadLimiter, + uploadLimiter: uploadLimiter, + tempDir: tempDir, + healthyMonitoringInterval: healthyMonitoringInterval, + unhealthyMonitoringInterval: unhealthyMonitoringInterval, + gracefulShutdownInterval: gracefulShutdownInterval, extendedGracefulShutdownInterval: extendedGracefulShutdownInterval, extendedGracefulShutDownOrgs: extendedGracefulShutDownOrgs, - healthCheckWorkPool: healthCheckWorkPool, - clock: clock, + healthCheckWorkPool: healthCheckWorkPool, + clock: clock, } for _, o := range opts { @@ -303,6 +303,7 @@ func (t *transformer) stepFor( suppressExitStatusCode, monitorOutputWrapper, logger, + grace, ), buffer, ) @@ -380,7 +381,7 @@ func (t *transformer) StepsRunner( var setup, action, postSetup, monitor, longLivedAction ifrit.Runner var substeps []ifrit.Runner - grace := evalGracefulShutdownInterval(container.CertificateProperties, t.extendedGracefulShutDownOrgs, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval) + grace := t.evalGracefulShutdownInterval(container.CertificateProperties) if container.Setup != nil { setup = t.stepFor( @@ -450,6 +451,7 @@ func (t *transformer) StepsRunner( false, false, logger.Session("sidecar"), + grace, )) } @@ -596,7 +598,6 @@ func (t *transformer) createCheck( Args: args, } - grace := evalGracefulShutdownInterval(container.CertificateProperties, t.extendedGracefulShutDownOrgs, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval) buffer := log_streamer.NewConcurrentBuffer(bytes.NewBuffer(nil)) bufferedLogStreamer := log_streamer.NewBufferStreamer(buffer, buffer) sidecar := steps.Sidecar{ @@ -613,7 +614,7 @@ func (t *transformer) createCheck( container.InternalIP, container.Ports, t.clock, - grace, + t.evalGracefulShutdownInterval(container.CertificateProperties), true, sidecar, container.Privileged, @@ -767,7 +768,6 @@ func (t *transformer) transformContainerProxyStep( Name: fmt.Sprintf("%s-envoy", container.Handle()), } - grace := evalGracefulShutdownInterval(execContainer.CertificateProperties, t.extendedGracefulShutDownOrgs, t.gracefulShutdownInterval, t.extendedGracefulShutdownInterval) proxyLogger := logger.Session("proxy") return steps.NewBackground(steps.NewRunWithSidecar( @@ -779,21 +779,20 @@ func (t *transformer) transformContainerProxyStep( execContainer.InternalIP, execContainer.Ports, t.clock, - grace, + t.evalGracefulShutdownInterval(execContainer.CertificateProperties), false, sidecar, execContainer.Privileged, ), proxyLogger) } -func evalGracefulShutdownInterval(certProps executor.CertificateProperties, ext_orgs []string, - std_grace, ext_grace time.Duration) time.Duration { +func (t *transformer) evalGracefulShutdownInterval(certProps executor.CertificateProperties) time.Duration { if len(certProps.OrganizationalUnit) > 0 { - for _, org := range ext_orgs { + for _, org := range t.extendedGracefulShutDownOrgs { if ("organization:" + org) == certProps.OrganizationalUnit[0] { - return ext_grace + return t.extendedGracefulShutdownInterval } } } - return std_grace + return t.gracefulShutdownInterval } diff --git a/depot/transformer/transformer_test.go b/depot/transformer/transformer_test.go index cee8f695..0bbd6535 100644 --- a/depot/transformer/transformer_test.go +++ b/depot/transformer/transformer_test.go @@ -17,6 +17,7 @@ import ( mfakes "code.cloudfoundry.org/diego-logging-client/testhelpers" "code.cloudfoundry.org/executor" "code.cloudfoundry.org/executor/depot/log_streamer" + "code.cloudfoundry.org/executor/depot/log_streamer/fake_log_streamer" "code.cloudfoundry.org/executor/depot/transformer" "code.cloudfoundry.org/garden" "code.cloudfoundry.org/garden/gardenfakes" @@ -34,21 +35,21 @@ import ( var _ = Describe("Transformer", func() { Describe("StepsRunner", func() { var ( - logger lager.Logger - optimusPrime transformer.Transformer - container executor.Container - logStreamer log_streamer.LogStreamer - gardenContainer *gardenfakes.FakeContainer - clock *fakeclock.FakeClock - fakeMetronClient *mfakes.FakeIngressClient - healthyMonitoringInterval time.Duration - unhealthyMonitoringInterval time.Duration - gracefulShutdownInterval time.Duration + logger lager.Logger + optimusPrime transformer.Transformer + container executor.Container + logStreamer log_streamer.LogStreamer + gardenContainer *gardenfakes.FakeContainer + clock *fakeclock.FakeClock + fakeMetronClient *mfakes.FakeIngressClient + healthyMonitoringInterval time.Duration + unhealthyMonitoringInterval time.Duration + gracefulShutdownInterval time.Duration extendedGracefulShutdownInterval time.Duration - gracefulShutDownPerOrg []string - healthCheckWorkPool *workpool.WorkPool - cfg transformer.Config - options []transformer.Option + extendedGracefulShutDownOrgs []string + healthCheckWorkPool *workpool.WorkPool + cfg transformer.Config + options []transformer.Option ) BeforeEach(func() { @@ -64,7 +65,7 @@ var _ = Describe("Transformer", func() { unhealthyMonitoringInterval = 1 * time.Millisecond gracefulShutdownInterval = 10 * time.Second extendedGracefulShutdownInterval = 20 * time.Second - gracefulShutDownPerOrg = []string{"test_org"} + extendedGracefulShutDownOrgs = []string{"ext_grace_org"} var err error healthCheckWorkPool, err = workpool.NewWorkPool(10) @@ -118,7 +119,7 @@ var _ = Describe("Transformer", func() { unhealthyMonitoringInterval, gracefulShutdownInterval, extendedGracefulShutdownInterval, - gracefulShutDownPerOrg, + extendedGracefulShutDownOrgs, healthCheckWorkPool, options..., ) @@ -172,7 +173,7 @@ var _ = Describe("Transformer", func() { } return &gardenfakes.FakeProcess{}, nil } - + runner, err := optimusPrime.StepsRunner(logger, container, gardenContainer, logStreamer, cfg) Expect(err).NotTo(HaveOccurred()) @@ -212,7 +213,7 @@ var _ = Describe("Transformer", func() { Expect(container.Monitor.RunAction.GetSuppressLogOutput()).Should(BeFalse()) Expect(processIO.Stdout).ShouldNot(Equal(ioutil.Discard)) Expect(processIO.Stderr).ShouldNot(Equal(ioutil.Discard)) - + process.Signal(os.Interrupt) clock.Increment(1 * time.Second) Eventually(process.Wait()).Should(Receive(nil)) @@ -1692,5 +1693,64 @@ var _ = Describe("Transformer", func() { }) }) }) + + Describe("when extended graceful interval is desired", func() { + var fakeStreamer *fake_log_streamer.FakeLogStreamer; + + JustBeforeEach(func() { + var spawnedProcess *gardenfakes.FakeProcess; + fakeStreamer = new(fake_log_streamer.FakeLogStreamer) + spawnedProcess = new(gardenfakes.FakeProcess) + initializingCh := make(chan struct{}) + waitExitedCh := make(chan int, 1) + fakeStreamer.StdoutReturns(gbytes.NewBuffer()) + fakeStreamer.StderrReturns(gbytes.NewBuffer()) + fakeStreamer.SourceNameReturns("testlogsource") + fakeStreamer.WithSourceReturns(fakeStreamer); + gardenContainer.RunStub = func(processSpec garden.ProcessSpec, processIO garden.ProcessIO) (garden.Process, error) { + return spawnedProcess, nil + } + spawnedProcess.WaitStub = func() (int, error) { + close(initializingCh) + return <-waitExitedCh, nil + } + runner, err := optimusPrime.StepsRunner(logger, container, gardenContainer, fakeStreamer, cfg) + Expect(err).NotTo(HaveOccurred()) + process := ifrit.Background(runner) + Eventually(initializingCh).Should(BeClosed()) + process.Signal(os.Interrupt) + Eventually(spawnedProcess.SignalCallCount).Should(Equal(1)) + Expect(spawnedProcess.SignalArgsForCall(0)).To(Equal(garden.SignalTerminate)) + clock.WaitForWatcherAndIncrement(100 * time.Second) + Eventually(spawnedProcess.SignalCallCount).Should(Equal(2)) + waitExitedCh <- (128 + 9) + Eventually(fakeStreamer.StdoutCallCount).Should(Equal(2)) + }) + + Context("container is a standard org", func() { + BeforeEach(func() { + container.RunInfo.CertificateProperties = executor.CertificateProperties{ + OrganizationalUnit: []string {"organization:std_org"}, + } + }) + + It("has standard graceful shutdown interval", func() { + Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 10s graceful shutdown interval\\)")) + }) + }) + + Context("container is an extended shutdown intervalorg", func() { + BeforeEach(func() { + container.RunInfo.CertificateProperties = executor.CertificateProperties{ + OrganizationalUnit: []string {"organization:ext_grace_org"}, + } + }) + + It("has standard graceful shutdown interval", func() { + Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 20s graceful shutdown interval\\)")) + }) + }) + }) + }) })