Controller groups

Each controller object only supports only one type of worker configuration which you set in advance. However, different controllers may have different types of workers, and crew supports controller groups to coordinate among these different worker types. With third-party launcher plugins from other packages, this mechanism will allow you to e.g. send some tasks to GPU-capable or high-memory workers while other tasks go to low-spec workers.

We demonstrate with a controller of fully persistent workers which always stay running and a controller of semi-persistent workers which terminate after completing four tasks. We create controller objects with names.

library(crew)
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(name = "semi-persistent", tasks_max = 4L)

crew uses a different TCP port for each controller you run, so please do not create hundreds of controllers. Please see the subsection on ports in the README.

We put these controller objects into a new controller group object.

group <- crew_controller_group(persistent, transient)

This controller group has a global connect() method to initialize both controllers.

group$start()

You can choose which worker pool to receive tasks.

group$push(name = "my task", command = sqrt(4), controller = "semi-persistent")

The controller group also supports global methods for wait(), pop(), and terminate(). These methods operate on all controllers at once by default, but the controllers argument allows you to select a subset of controllers to act on. Below in pop() the controller column of the output indicates which controller ran the task.

group$wait(controllers = "semi-persistent")
group$pop()
#> # A tibble: 1 × 13
#>   name    command result    status error  code trace warnings seconds  seed
#>   <chr>   <chr>   <list>    <chr>  <chr> <int> <chr> <chr>      <dbl> <int>
#> 1 my task sqrt(4) <dbl [1]> succe… NA        0 NA    NA             0    NA
#> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>

The map() method provides functional programming, and the controller argument lets you choose the controller to submit the tasks.

group$map(
  command = a + b + c + d,
  iterate = list(
    a = c(1, 3),
    b = c(2, 4)
  ),
  data = list(c = 5),
  globals = list(d = 6),
  controller = "persistent"
)
#> # A tibble: 2 × 13
#>   name       command result status error  code trace warnings seconds  seed
#>   <chr>      <chr>   <list> <chr>  <chr> <int> <chr> <chr>      <dbl> <int>
#> 1 unnamed_t… a + b … <dbl>  succe… NA        0 NA    NA             0    NA
#> 2 unnamed_t… a + b … <dbl>  succe… NA        0 NA    NA             0    NA
#> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>

The controller group has a summary() method which aggregates the summaries of one or more controllers.

group$summary()
#> # A tibble: 2 × 8
#>   controller      tasks seconds success error crash cancel warning
#>   <chr>           <int>   <dbl>   <int> <int> <int>  <int>   <int>
#> 1 persistent          2       0       2     0     0      0       0
#> 2 semi-persistent     1       0       1     0     0      0       0

When you are finished, please call terminate() with no arguments to terminate all controllers in the controller group.

group$terminate()

Backup controllers

As described in the introduction vignette, a worker may crash if it runs out of memory or encounters a system-level issue. The controller lets you retry the task, but pop() throws an error if the task’s worker crashed more than crashes_max times in a row.

Controller groups and backup controllers let you configure exactly how tasks are retried. Each controller can designate a backup controller to run tasks that cause too many crashes in the original controller. With backups, you can connect a whole chain of controllers with increasing resources. If you put all these controllers in a controller group, then you can use group-level push() and pop() methods without having to know which controller actually ran the task.

Consider the following example. We define three different controllers that use one another as backups. The default controller uses the medium_memory controller as a backup, and the medium_memory controller uses the high_memory controller as a backup.

library(crew)
library(crew.cluster) # https://wlandau.github.io/crew.cluster/index.html

high_memory <- crew_controller_slurm(
  name = "high_memory",
  options_cluster = crew_options_slurm(memory_gigabytes_required = 64),
  crashes_max = 2
)

medium_memory <- crew_controller_slurm(
  name = "medium_memory",
  options_cluster = crew_options_slurm(memory_gigabytes_required = 64),
  crashes_max = 3,
  backup = high_memory
)

default <- crew_controller_slurm(
  name = "default",
  crashes_max = 4,
  backup = medium_memory
)

We put all three controllers in a controller group. The default controller is listed first, so tasks pushed with group$push() will automatically run in the default unless special circumstances dictate otherwise.

group <- crew_controller_group(default, medium_memory, high_memory)

Consider a task that sometimes exhausts the available memory of its SLURM worker:

group$push(command = my.package::run_heavy_task(), name = "heavy_task")

If the task exhausts available memory and crashes its worker, then pop() informs you:

task <- group$pop()
task[, c("name", "result", "status", "error", "code", "controller")]
#> # A tibble: 1 × 6
#>   name       result    status error                  code controller   
#>   <chr>      <list>    <chr>  <chr>                 <int> <chr>        
#> 1 heavy_task <lgl [1]> crash  19 | Connection reset    19 default

when you resubmit the task with group$push(), it will run on the default controller again. But if you keep resubmitting it and it crashes 4 times in a row (from crashes_max = 4) then the next push() will send it to the backup medium_memory controller. If the task’s worker crashes in the next 3 pushes crash the task in the medium_memory controller, then subsequent pushes will use the high_memory controller, which is the backup of medium_memory. If the high_memory controller successfully completes the task, the next group$pop() returns a successful result.

task <- group$pop()
task[, c("name", "result", "status", "error", "code", "controller")]
# A tibble: 1 × 6
  name       result    status  error  code controller   
  <chr>      <list>    <chr>   <chr> <int> <chr>        
1 heavy_task <lgl [1]> success NA        0 high_memory

group$summary() counts the number of times each controller attempted the task, with crashes counted as errors.

group$summary()
#> # A tibble: 2 × 8
#>   controller    tasks seconds success error crash cancel warning
#>   <chr>         <int>   <dbl>   <int> <int> <int>  <int>   <int>
#> 1 default           4       0       0     0     4      0       0
#> 2 medium_memory     3       0       0     0     3      0       0
#> 3 high_memory       1       0       1     0     0      0       0

If you use a controller group like this one in a targets pipeline (with tar_option_set(controller = "group") in _targets.R) then tar_make() will automatically retry crashed tasks using the configuration in the controller group. This behavior requires targets version 1.10.0.9002 or later.