Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add job monitor for SLURM #40

Merged
merged 5 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Imports:
rlang,
utils,
vctrs,
xml2
xml2,
yaml
Suggests:
knitr (>= 1.30),
markdown (>= 1.1),
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export(crew_class_launcher_sge)
export(crew_class_launcher_slurm)
export(crew_class_monitor_cluster)
export(crew_class_monitor_sge)
export(crew_class_monitor_slurm)
export(crew_controller_lsf)
export(crew_controller_pbs)
export(crew_controller_sge)
Expand All @@ -18,6 +19,7 @@ export(crew_launcher_sge)
export(crew_launcher_slurm)
export(crew_monitor_cluster)
export(crew_monitor_sge)
export(crew_monitor_slurm)
importFrom(R6,R6Class)
importFrom(crew,crew_assert)
importFrom(crew,crew_class_launcher)
Expand Down
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# crew.cluster 0.3.0.9000


* Add a "monitor" class for SLURM clusters.

# crew.cluster 0.3.0

Expand Down
119 changes: 119 additions & 0 deletions R/crew_monitor_slurm.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#' @title `r lifecycle::badge("experimental")` Create a SLURM monitor object.
#' @export
#' @family slurm
#' @description Create an `R6` object to monitor SLURM cluster jobs.
#' @inheritParams crew_monitor_cluster
crew_monitor_slurm <- function(
verbose = TRUE,
command_list = as.character(Sys.which("squeue")),
command_terminate = as.character(Sys.which("scancel"))
) {
out <- crew_class_monitor_slurm$new(
verbose = verbose,
command_list = command_list,
command_terminate = command_terminate
)
out$validate()
out
}

#' @title `r lifecycle::badge("experimental")` SLURM monitor class
#' @export
#' @family slurm
#' @description SLURM monitor `R6` class
#' @details See [crew_monitor_slurm()].
crew_class_monitor_slurm <- R6::R6Class(
classname = "crew_class_monitor_slurm",
inherit = crew_class_monitor_cluster,
cloneable = FALSE,
public = list(
#' @description List SLURM jobs.
#'
#' This function loads the entire SLURM queue for all users, so it may take
#' several seconds to execute. It is intended for interactive use, and
#' should especially be avoided in scripts where it is called frequently.
wlandau marked this conversation as resolved.
Show resolved Hide resolved
#' @return A `tibble` with one row per SLURM job and columns with
#' specific details.
#' @param user Character of length 1, user name of the jobs to list.
jobs = function(user = ps::ps_username()) {
# Cannot be tested with automated tests.
# Tested in tests/slurm/monitor.R.
# nocov start
crew::crew_assert(
user,
is.character(.),
length(.) == 1L,
!anyNA(.),
nzchar(.),
message = "'user' must be `NULL` or a character vector of length 1"
)
text <- system2(
private$.command_list,
args = shQuote(c("--yaml")),
stdout = TRUE,
stderr = if_any(private$.verbose, "", FALSE),
wait = TRUE
)
monitor_cols <- c("job_id", "partition", "name", "user_name", "job_state",
"start_time", "node_count", "state_reason")
yaml <- yaml::read_yaml(text = text)
out <- map(
yaml$jobs,
~ tibble::new_tibble(
c(
map(.x[monitor_cols], ~ unlist(.x) %||% NA),
list(
nodes = paste(unlist(.x$job_resources$nodes), collapse = ",") %||% NA
)
)
)
)
out <- do.call(vctrs::vec_rbind, out)
out <- out[out$user_name == user,]
out <- out[which(out$job_state != "CANCELLED"),]
out$job_id <- as.character(out$job_id)
out$start_time <- as.POSIXct(out$start_time, origin = "1970-01-01")
out
# nocov end
},
#' @description Terminate one or more SLURM jobs.
#' @return `NULL` (invisibly).
#' @param jobs Character vector of job names or job IDs to terminate.
#' Ignored if `all` is set to `TRUE`.
#' @param all Logical of length 1, whether to terminate all the jobs
#' under your user name. This terminates ALL your SLURM jobs,
#' regardless of whether `crew.cluster` launched them,
#' so use with caution!
terminate = function(jobs = NULL, all = FALSE) {
# Cannot be tested with automated tests.
# Tested in tests/slurm/monitor.R.
# nocov start
crew::crew_assert(
jobs %||% "x",
is.character(.),
!anyNA(.),
nzchar(.),
message = paste(
"'jobs' must be `NULL` or a character vector of",
"valid job names or IDs."
)
)
crew::crew_assert(
all,
isTRUE(.) || isFALSE(.),
message = "'all' must be TRUE or FALSE."
)
args <- shQuote(if_any(all, c("-u", ps::ps_username()), jobs))
stream <- if_any(private$.verbose, "", FALSE)
system2(
command = private$.command_terminate,
args = args,
stdout = stream,
stderr = stream,
wait = TRUE
)
invisible()
# nocov end
}
)
)
4 changes: 3 additions & 1 deletion man/crew_class_launcher_slurm.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions man/crew_class_monitor_slurm.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion man/crew_controller_slurm.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion man/crew_launcher_slurm.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions man/crew_monitor_slurm.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions tests/slurm/monitor.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
test_that("SLURM monitor terminate one job at a time", {
controller <- crew_controller_slurm(
name = "my_workflow",
workers = 2L,
seconds_idle = 600,
script_lines = paste0("module load R/", getRversion()),
verbose = TRUE
)
on.exit(controller$terminate())
controller$start()
controller$launch(n = 2L)
names <- vapply(
controller$launcher$workers$handle,
function(handle) handle$name,
FUN.VALUE = character(1L)
)
monitor <- crew_monitor_slurm(verbose = TRUE)
crew::crew_retry(
~ all(names %in% monitor$jobs()$name),
seconds_interval = 1,
seconds_timeout = 60,
message = "could not submit jobs"
)
jobs <- monitor$jobs()
jobs <- jobs[jobs$name %in% names, ]
expect_equal(nrow(jobs), 2L)
expect_true(is.character(jobs$job_id))
expect_false(anyNA(jobs$job_id))
expect_true(all(nzchar(jobs$job_id)))
monitor$terminate(jobs = jobs$job_id)
crew::crew_retry(
~ !any(jobs$job_id %in% monitor$jobs()$job_id),
seconds_interval = 1,
seconds_timeout = 60,
message = "could not terminate jobs"
)
expect_false(any(jobs$job_id %in% monitor$jobs()$job_id))
})

test_that("THIS TEST DELETES ALL USER JOBS! USE WITH CAUTION!", {
controller <- crew_controller_slurm(
name = "my_workflow",
workers = 2L,
seconds_idle = 600,
script_lines = paste0("module load R/", getRversion()),
verbose = TRUE
)
on.exit(controller$terminate())
controller$start()
controller$launch(n = 2L)
names <- vapply(
controller$launcher$workers$handle,
function(handle) handle$name,
FUN.VALUE = character(1L)
)
monitor <- crew_monitor_slurm(verbose = TRUE)
crew::crew_retry(
~ all(names %in% monitor$jobs()$name),
seconds_interval = 1,
seconds_timeout = 60,
message = "could not submit jobs"
)
jobs <- monitor$jobs()
jobs <- jobs[jobs$name %in% names, ]
expect_equal(nrow(jobs), 2L)
expect_true(is.character(jobs$job_id))
expect_false(anyNA(jobs$job_id))
expect_true(all(nzchar(jobs$job_id)))
monitor$terminate(all = TRUE)
crew::crew_retry(
~ nrow(monitor$jobs()) < 1L,
seconds_interval = 1,
seconds_timeout = 60,
message = "could not terminate jobs"
)
expect_lt(nrow(monitor$jobs()), 1L)
})