Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom grace #69

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions depot/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type transformer struct {
healthyMonitoringInterval time.Duration
unhealthyMonitoringInterval time.Duration
gracefulShutdownInterval time.Duration
extendedGracefulShutdownInterval time.Duration
extendedGracefulShutDownOrgs []string
healthCheckWorkPool *workpool.WorkPool

useContainerProxy bool
Expand Down Expand Up @@ -110,6 +112,8 @@ func NewTransformer(
healthyMonitoringInterval time.Duration,
unhealthyMonitoringInterval time.Duration,
gracefulShutdownInterval time.Duration,
extendedGracefulShutdownInterval time.Duration,
extendedGracefulShutDownOrgs []string,
healthCheckWorkPool *workpool.WorkPool,
opts ...Option,
) *transformer {
Expand All @@ -123,6 +127,8 @@ func NewTransformer(
healthyMonitoringInterval: healthyMonitoringInterval,
unhealthyMonitoringInterval: unhealthyMonitoringInterval,
gracefulShutdownInterval: gracefulShutdownInterval,
extendedGracefulShutdownInterval: extendedGracefulShutdownInterval,
extendedGracefulShutDownOrgs: extendedGracefulShutDownOrgs,
healthCheckWorkPool: healthCheckWorkPool,
clock: clock,
}
Expand All @@ -144,6 +150,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode bool,
monitorOutputWrapper bool,
logger lager.Logger,
grace time.Duration,
) ifrit.Runner {
a := action.GetValue()
switch actionModel := a.(type) {
Expand All @@ -157,7 +164,7 @@ func (t *transformer) stepFor(
internalIP,
ports,
t.clock,
t.gracefulShutdownInterval,
grace,
suppressExitStatusCode,
)

Expand Down Expand Up @@ -195,6 +202,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
),
actionModel.StartMessage,
actionModel.SuccessMessage,
Expand All @@ -215,6 +223,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
),
time.Duration(actionModel.TimeoutMs)*time.Millisecond,
t.clock,
Expand All @@ -233,6 +242,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
),
logger,
)
Expand All @@ -254,6 +264,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
),
buffer,
)
Expand All @@ -268,6 +279,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
)
}
subSteps[i] = subStep
Expand All @@ -291,6 +303,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
),
buffer,
)
Expand All @@ -305,6 +318,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
)
}
subSteps[i] = subStep
Expand All @@ -325,6 +339,7 @@ func (t *transformer) stepFor(
suppressExitStatusCode,
monitorOutputWrapper,
logger,
grace,
)
}
return steps.NewSerial(subSteps)
Expand Down Expand Up @@ -366,6 +381,8 @@ func (t *transformer) StepsRunner(
var setup, action, postSetup, monitor, longLivedAction ifrit.Runner
var substeps []ifrit.Runner

grace := t.evalGracefulShutdownInterval(container.CertificateProperties)

if container.Setup != nil {
setup = t.stepFor(
logStreamer,
Expand All @@ -377,6 +394,7 @@ func (t *transformer) StepsRunner(
false,
false,
logger.Session("setup"),
grace,
)
}
setup = steps.NewTimedStep(logger, setup, config.MetronClient, t.clock, config.CreationStartTime)
Expand All @@ -397,7 +415,7 @@ func (t *transformer) StepsRunner(
container.InternalIP,
container.Ports,
t.clock,
t.gracefulShutdownInterval,
grace,
suppressExitStatusCode,
)
}
Expand All @@ -418,6 +436,7 @@ func (t *transformer) StepsRunner(
false,
false,
logger.Session("action"),
grace,
)

substeps = append(substeps, action)
Expand All @@ -432,6 +451,7 @@ func (t *transformer) StepsRunner(
false,
false,
logger.Session("sidecar"),
grace,
))
}

Expand Down Expand Up @@ -485,6 +505,7 @@ func (t *transformer) StepsRunner(
true,
true,
logger.Session("monitor-run"),
grace,
)
},
logger.Session("monitor"),
Expand Down Expand Up @@ -593,7 +614,7 @@ func (t *transformer) createCheck(
container.InternalIP,
container.Ports,
t.clock,
t.gracefulShutdownInterval,
t.evalGracefulShutdownInterval(container.CertificateProperties),
true,
sidecar,
container.Privileged,
Expand Down Expand Up @@ -758,9 +779,20 @@ func (t *transformer) transformContainerProxyStep(
execContainer.InternalIP,
execContainer.Ports,
t.clock,
t.gracefulShutdownInterval,
t.evalGracefulShutdownInterval(execContainer.CertificateProperties),
false,
sidecar,
execContainer.Privileged,
), proxyLogger)
}

func (t *transformer) evalGracefulShutdownInterval(certProps executor.CertificateProperties) time.Duration {
if len(certProps.OrganizationalUnit) > 0 {
for _, org := range t.extendedGracefulShutDownOrgs {
if ("organization:" + org) == certProps.OrganizationalUnit[0] {
return t.extendedGracefulShutdownInterval
}
}
}
return t.gracefulShutdownInterval
}
70 changes: 68 additions & 2 deletions depot/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
mfakes "code.cloudfoundry.org/diego-logging-client/testhelpers"
"code.cloudfoundry.org/executor"
"code.cloudfoundry.org/executor/depot/log_streamer"
"code.cloudfoundry.org/executor/depot/log_streamer/fake_log_streamer"
"code.cloudfoundry.org/executor/depot/transformer"
"code.cloudfoundry.org/garden"
"code.cloudfoundry.org/garden/gardenfakes"
Expand Down Expand Up @@ -44,6 +45,8 @@ var _ = Describe("Transformer", func() {
healthyMonitoringInterval time.Duration
unhealthyMonitoringInterval time.Duration
gracefulShutdownInterval time.Duration
extendedGracefulShutdownInterval time.Duration
extendedGracefulShutDownOrgs []string
healthCheckWorkPool *workpool.WorkPool
cfg transformer.Config
options []transformer.Option
Expand All @@ -61,6 +64,8 @@ var _ = Describe("Transformer", func() {
healthyMonitoringInterval = 1 * time.Second
unhealthyMonitoringInterval = 1 * time.Millisecond
gracefulShutdownInterval = 10 * time.Second
extendedGracefulShutdownInterval = 20 * time.Second
extendedGracefulShutDownOrgs = []string{"ext_grace_org"}

var err error
healthCheckWorkPool, err = workpool.NewWorkPool(10)
Expand Down Expand Up @@ -113,6 +118,8 @@ var _ = Describe("Transformer", func() {
healthyMonitoringInterval,
unhealthyMonitoringInterval,
gracefulShutdownInterval,
extendedGracefulShutdownInterval,
extendedGracefulShutDownOrgs,
healthCheckWorkPool,
options...,
)
Expand Down Expand Up @@ -166,7 +173,7 @@ var _ = Describe("Transformer", func() {
}
return &gardenfakes.FakeProcess{}, nil
}

runner, err := optimusPrime.StepsRunner(logger, container, gardenContainer, logStreamer, cfg)
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -206,7 +213,7 @@ var _ = Describe("Transformer", func() {
Expect(container.Monitor.RunAction.GetSuppressLogOutput()).Should(BeFalse())
Expect(processIO.Stdout).ShouldNot(Equal(ioutil.Discard))
Expect(processIO.Stderr).ShouldNot(Equal(ioutil.Discard))

process.Signal(os.Interrupt)
clock.Increment(1 * time.Second)
Eventually(process.Wait()).Should(Receive(nil))
Expand Down Expand Up @@ -1686,5 +1693,64 @@ var _ = Describe("Transformer", func() {
})
})
})

Describe("when extended graceful interval is desired", func() {
var fakeStreamer *fake_log_streamer.FakeLogStreamer;

JustBeforeEach(func() {
var spawnedProcess *gardenfakes.FakeProcess;
fakeStreamer = new(fake_log_streamer.FakeLogStreamer)
spawnedProcess = new(gardenfakes.FakeProcess)
initializingCh := make(chan struct{})
waitExitedCh := make(chan int, 1)
fakeStreamer.StdoutReturns(gbytes.NewBuffer())
fakeStreamer.StderrReturns(gbytes.NewBuffer())
fakeStreamer.SourceNameReturns("testlogsource")
fakeStreamer.WithSourceReturns(fakeStreamer);
gardenContainer.RunStub = func(processSpec garden.ProcessSpec, processIO garden.ProcessIO) (garden.Process, error) {
return spawnedProcess, nil
}
spawnedProcess.WaitStub = func() (int, error) {
close(initializingCh)
return <-waitExitedCh, nil
}
runner, err := optimusPrime.StepsRunner(logger, container, gardenContainer, fakeStreamer, cfg)
Expect(err).NotTo(HaveOccurred())
process := ifrit.Background(runner)
Eventually(initializingCh).Should(BeClosed())
process.Signal(os.Interrupt)
Eventually(spawnedProcess.SignalCallCount).Should(Equal(1))
Expect(spawnedProcess.SignalArgsForCall(0)).To(Equal(garden.SignalTerminate))
clock.WaitForWatcherAndIncrement(100 * time.Second)
Eventually(spawnedProcess.SignalCallCount).Should(Equal(2))
waitExitedCh <- (128 + 9)
Eventually(fakeStreamer.StdoutCallCount).Should(Equal(2))
})

Context("container is a standard org", func() {
BeforeEach(func() {
container.RunInfo.CertificateProperties = executor.CertificateProperties{
OrganizationalUnit: []string {"organization:std_org"},
}
})

It("has standard graceful shutdown interval", func() {
Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 10s graceful shutdown interval\\)"))
})
})

Context("container is an extended shutdown intervalorg", func() {
BeforeEach(func() {
container.RunInfo.CertificateProperties = executor.CertificateProperties{
OrganizationalUnit: []string {"organization:ext_grace_org"},
}
})

It("has standard graceful shutdown interval", func() {
Expect(fakeStreamer.Stdout()).To(gbytes.Say("Exit status 137 \\(exceeded 20s graceful shutdown interval\\)"))
})
})
})

})
})
8 changes: 8 additions & 0 deletions initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type ExecutorConfig struct {
GardenHealthcheckTimeout durationjson.Duration `json:"garden_healthcheck_timeout,omitempty"`
GardenNetwork string `json:"garden_network,omitempty"`
GracefulShutdownInterval durationjson.Duration `json:"graceful_shutdown_interval,omitempty"`
ExtendedGracefulShutdownInterval durationjson.Duration `json:"extended_graceful_shutdown_interval,omitempty"`
HealthCheckContainerOwnerName string `json:"healthcheck_container_owner_name,omitempty"`
HealthCheckWorkPoolSize int `json:"healthcheck_work_pool_size,omitempty"`
HealthyMonitoringInterval durationjson.Duration `json:"healthy_monitoring_interval,omitempty"`
Expand Down Expand Up @@ -149,6 +150,7 @@ type ExecutorConfig struct {
UnhealthyMonitoringInterval durationjson.Duration `json:"unhealthy_monitoring_interval,omitempty"`
UseSchedulableDiskSize bool `json:"use_schedulable_disk_size,omitempty"`
VolmanDriverPaths string `json:"volman_driver_paths"`
ExtendedGracefulShutdownOrgs []string `json:"extended_graceful_shutdown_orgs,omitempty"`
}

var (
Expand Down Expand Up @@ -242,6 +244,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string,
time.Duration(config.HealthyMonitoringInterval),
time.Duration(config.UnhealthyMonitoringInterval),
time.Duration(config.GracefulShutdownInterval),
time.Duration(config.ExtendedGracefulShutdownInterval),
healthCheckWorkPool,
clock,
postSetupHook,
Expand All @@ -250,6 +253,7 @@ func Initialize(logger lager.Logger, config ExecutorConfig, cellID, zone string,
gardenHealthcheckRootFS,
config.EnableContainerProxy,
time.Duration(config.EnvoyDrainTimeout),
config.ExtendedGracefulShutdownOrgs,
)

hub := event.NewHub()
Expand Down Expand Up @@ -536,6 +540,7 @@ func initializeTransformer(
healthyMonitoringInterval time.Duration,
unhealthyMonitoringInterval time.Duration,
gracefulShutdownInterval time.Duration,
extendedGracefulShutdownInterval time.Duration,
healthCheckWorkPool *workpool.WorkPool,
clock clock.Clock,
postSetupHook []string,
Expand All @@ -544,6 +549,7 @@ func initializeTransformer(
declarativeHealthcheckRootFS string,
enableContainerProxy bool,
drainWait time.Duration,
extendedGracefulShutDownOrgs []string,
) transformer.Transformer {
var options []transformer.Option
compressor := compressor.NewTgz()
Expand Down Expand Up @@ -571,6 +577,8 @@ func initializeTransformer(
healthyMonitoringInterval,
unhealthyMonitoringInterval,
gracefulShutdownInterval,
extendedGracefulShutdownInterval,
extendedGracefulShutDownOrgs,
healthCheckWorkPool,
options...,
)
Expand Down