Each controller object only supports only one type of worker
configuration which you set in advance. However, different controllers
may have different types of workers, and crew
supports
controller groups to coordinate among these different worker types. With
third-party launcher plugins from other packages, this mechanism will
allow you to e.g. send some tasks to GPU-capable or high-memory workers
while other tasks go to low-spec workers.
We demonstrate with a controller of fully persistent workers which always stay running and a controller of semi-persistent workers which terminate after completing four tasks. We create controller objects with names.
library(crew)
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(name = "semi-persistent", tasks_max = 4L)
crew
uses a different TCP port for each controller you
run, so please do not create hundreds of controllers. Please see the
subsection on ports in the README.
We put these controller objects into a new controller group object.
This controller group has a global connect()
method to
initialize both controllers.
You can choose which worker pool to receive tasks.
The controller group also supports global methods for
wait()
, pop()
, and terminate()
.
These methods operate on all controllers at once by default, but the
controllers
argument allows you to select a subset of
controllers to act on. Below in pop()
the
controller
column of the output indicates which controller
ran the task.
group$wait(controllers = "semi-persistent")
group$pop()
#> # A tibble: 1 × 13
#> name command result status error code trace warnings seconds seed
#> <chr> <chr> <list> <chr> <chr> <int> <chr> <chr> <dbl> <int>
#> 1 my task sqrt(4) <dbl [1]> succe… NA 0 NA NA 0 NA
#> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
The map()
method provides functional programming, and the controller
argument lets you choose the controller to submit the tasks.
group$map(
command = a + b + c + d,
iterate = list(
a = c(1, 3),
b = c(2, 4)
),
data = list(c = 5),
globals = list(d = 6),
controller = "persistent"
)
#> # A tibble: 2 × 13
#> name command result status error code trace warnings seconds seed
#> <chr> <chr> <list> <chr> <chr> <int> <chr> <chr> <dbl> <int>
#> 1 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA
#> 2 unnamed_t… a + b … <dbl> succe… NA 0 NA NA 0 NA
#> # ℹ 3 more variables: algorithm <chr>, controller <chr>, worker <chr>
The controller group has a summary()
method which
aggregates the summaries of one or more controllers.
group$summary()
#> # A tibble: 2 × 8
#> controller tasks seconds success error crash cancel warning
#> <chr> <int> <dbl> <int> <int> <int> <int> <int>
#> 1 persistent 2 0 2 0 0 0 0
#> 2 semi-persistent 1 0 1 0 0 0 0
When you are finished, please call terminate()
with no
arguments to terminate all controllers in the controller group.
As described in the introduction
vignette, a worker may crash if it runs out of memory or encounters
a system-level issue. The controller lets you retry the task, but
pop()
throws an error if the task’s worker crashed more
than crashes_max
times in a row.
Controller groups and backup controllers let you configure exactly
how tasks are retried. Each controller can designate a backup controller
to run tasks that cause too many crashes in the original controller.
With backups, you can connect a whole chain of controllers with
increasing resources. If you put all these controllers in a controller
group, then you can use group-level push()
and
pop()
methods without having to know which controller
actually ran the task.
Consider the following example. We define three different controllers
that use one another as backups. The default
controller
uses the medium_memory
controller as a backup, and the
medium_memory
controller uses the high_memory
controller as a backup.
library(crew)
library(crew.cluster) # https://wlandau.github.io/crew.cluster/index.html
high_memory <- crew_controller_slurm(
name = "high_memory",
options_cluster = crew_options_slurm(memory_gigabytes_required = 64),
crashes_max = 2
)
medium_memory <- crew_controller_slurm(
name = "medium_memory",
options_cluster = crew_options_slurm(memory_gigabytes_required = 64),
crashes_max = 3,
backup = high_memory
)
default <- crew_controller_slurm(
name = "default",
crashes_max = 4,
backup = medium_memory
)
We put all three controllers in a controller group. The
default
controller is listed first, so tasks pushed with
group$push()
will automatically run in the
default
unless special circumstances dictate otherwise.
Consider a task that sometimes exhausts the available memory of its SLURM worker:
If the task exhausts available memory and crashes its worker, then
pop()
informs you:
task <- group$pop()
task[, c("name", "result", "status", "error", "code", "controller")]
#> # A tibble: 1 × 6
#> name result status error code controller
#> <chr> <list> <chr> <chr> <int> <chr>
#> 1 heavy_task <lgl [1]> crash 19 | Connection reset 19 default
when you resubmit the task with group$push()
, it will
run on the default controller again. But if you keep resubmitting it and
it crashes 4 times in a row (from crashes_max = 4
) then the
next push()
will send it to the backup
medium_memory
controller. If the task’s worker crashes in
the next 3 pushes crash the task in the medium_memory
controller, then subsequent pushes will use the high_memory
controller, which is the backup of medium_memory
. If the
high_memory
controller successfully completes the task, the
next group$pop()
returns a successful result.
task <- group$pop()
task[, c("name", "result", "status", "error", "code", "controller")]
# A tibble: 1 × 6
name result status error code controller
<chr> <list> <chr> <chr> <int> <chr>
1 heavy_task <lgl [1]> success NA 0 high_memory
group$summary()
counts the number of times each
controller attempted the task, with crashes counted as errors.
group$summary()
#> # A tibble: 2 × 8
#> controller tasks seconds success error crash cancel warning
#> <chr> <int> <dbl> <int> <int> <int> <int> <int>
#> 1 default 4 0 0 0 4 0 0
#> 2 medium_memory 3 0 0 0 3 0 0
#> 3 high_memory 1 0 1 0 0 0 0
If you use a controller group like this one in a targets
pipeline (with tar_option_set(controller = "group")
in
_targets.R
) then tar_make()
will automatically
retry crashed tasks using the configuration in the controller group.
This behavior requires targets
version 1.10.0.9002 or later.