--- title: "Launcher plugins" output: rmarkdown::html_vignette: toc: true number_sections: true vignette: > %\VignetteIndexEntry{Launcher plugins} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) library(crew) ``` # About `crew` lets users write custom [launchers](https://wlandau.github.io/crew/reference/crew_class_launcher.html) for different types of workers that connect over the local network. The [`crew.cluster`](https://wlandau.github.io/crew.cluster/) package already has plugins for traditional high-performance computing schedulers ([SLURM](https://slurm.schedmd.com/), SGE, LSF, and PBS/TORQUE). # šŸ“£ Request for community contributions šŸ“£ **The launcher plugin framework aims to extend `crew` to modern platforms and services like [Google Cloud Run](https://cloud.google.com/run/), [Kubernetes](https://kubernetes.io/), and beyond. At the time of writing, plugins for cloud computing do not yet exist. So if you have access to these services and know how to use them, please consider contributing a package with plugins of your own. The maintainer of `crew` would love to work with you!** # How it works These [launcher](https://wlandau.github.io/crew/reference/crew_class_launcher.html) plugins need not become part of the `crew` package itself. You can write your plugin in a simple R script, or you write it in a custom R package that [depends on](https://r-pkgs.org/dependencies-in-practice.html) `crew`. Published packages with [launcher](https://wlandau.github.io/crew/reference/crew_class_launcher.html) plugins are powerful extensions that enhance `crew` for the entire open-source community. See [R Packages](https://r-pkgs.org/) by [Hadley Wickham](https://hadley.nz/) and [Jenny Bryan](https://jennybryan.org/) for how to write an R package. # Scope This vignette demonstrates how to write a `crew` launcher plugin. It assumes prior familiarity with [`R6` classes](https://r6.r-lib.org/articles/Introduction.html) and the computing platform of your plugin. # Implementation To create your own launcher plugin, write an [`R6`](https://r6.r-lib.org/articles/Introduction.html) subclass of [`crew_class_launcher`](https://wlandau.github.io/crew/reference/crew_class_launcher.html) with a [`launch_worker()`](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-launch-worker-) method analogous the one in the [local process launcher](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html). `launch_worker()` must accept the same arguments as the [local process `launch_worker()` method](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-launch-worker-), generate a call to [`crew_worker()`](https://wlandau.github.io/crew/reference/crew_worker.html), and then submit a new job or process to run that call. # Network Each worker that launches must be able to dial into the client over the local network. The `host` argument of `crew_client()` provides the local IP address, and the `port` argument provides the TCP port. The controller helper function (see below) should expose arguments `host` and `port` in order to solve potential network problems like [this one](https://github.com/wlandau/crew.cluster/issues/1#issuecomment-1546024163). By default, `host` is the local IP address. `crew` assumes the local network is secure. Please take the time to assess the network security risks of your computing environment. Use at your own risk. # Termination We recommend you implement an optional [`terminate_worker()`](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-terminate-worker-) method. Although `mirai` has its own way of terminating workers, it only works if the worker already connected, and it cannot reach workers that fail to connect and hang in a crashed state. An optional [`terminate_worker()`](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-terminate-worker-) method in your `crew` launcher plugin is extra assurance that these workers will exit. If you implement a custom [`terminate_worker()`](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-terminate-worker-) method, it must not throw an error (and should not throw a warning or message) if the worker is already terminated. In addition, it must accept a handle that identifies the worker, and this handle must be the return value of the previous call to [`launch_worker()`](https://wlandau.github.io/crew/reference/crew_class_launcher_local.html#method-launch-worker-). A handle can be any kind of R object: a process ID, a job name, an `R6` object returned by `callr::r_bg()`, etc. # Example The following is a custom custom launcher class whose workers are local R processes on Unix-like systems. ```r custom_launcher_class <- R6::R6Class( classname = "custom_launcher_class", inherit = crew::crew_class_launcher, public = list( launch_worker = function(call, name, launcher, worker, instance) { bin <- file.path(R.home("bin"), "Rscript") processx::process$new( command = bin, args = c(self$r_arguments, "-e", call), cleanup = FALSE ) }, terminate_worker = function(handle) { handle$signal(crew::crew_terminate_signal()) } ) ) ``` Inside `launch_worker()`, the `processx::process$new(command = bin, args = c(self$r_arguments, "-e", call))` line runs the [`crew_worker()`](https://wlandau.github.io/crew/reference/crew_worker.html) call in an external R process with the command line arguments from `r_arguments` (supplied when the launcher is created). This process runs in the background, connects back to `crew` and `mirai` over the local network, and accepts the tasks you push to the controller. `processx::process$new()` also returns a handle which the `terminate_worker()` method can use to force-terminate the process if appropriate. `mirai` has its own way to terminate workers, so a `terminate_worker()` method is not strictly required, but it is a useful safeguard in case a worker hangs in a crashed state before it establishes a connection. Every `launch_worker()` method must accept arguments `call`, `launcher`, `worker`, and `instance`. The method does not actually need to use all these arguments, but they must be present in the function signature. * `call`: text string with a call to [`crew_worker()`](https://wlandau.github.io/crew/reference/crew_worker.html). * `name`: text string with an informative name for the worker instance. * `launcher`: text string with the name of the launcher. * `worker`: positive integer index of the worker. Ranges from 1 to the maximum number of simultaneous workers configured for the controller. * `instance`: text string with the instance of the worker in the launcher at the given worker index. To see what the `call` object looks like, create a new launcher and run the `call()` method. ```r library(crew) launcher <- crew_launcher_local() launcher$call( socket = "ws://127.0.0.1:5000/3/aa9c59ea", launcher = "my_launcher", worker = 3L, instance = "aa9c59ea" ) #> [1] "crew::crew_worker(settings = list(url = \"ws://127.0.0.1:5000/3/aa9c59ea\", asyncdial = FALSE, autoexit = 15L, cleanup = 1L, output = TRUE, maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L, tls = NULL, rs = NULL), launcher = \"my_launcher\", worker = 3L, instance = \"aa9c59ea\", options_metrics = crew::crew_options_metrics(path = NULL, seconds_interval = 5))" ``` # Worker retries The `launch_worker()` method can call `self$crashes(index = worker)` to get the number of consecutive times the worker has crashed (exited without completing all its tasks). When a worker crashes, its crash count increments by 1. The next time a worker does not crash (exits only after completing all its tasks) the crash count resets back to zero. You can use the crash count to assign more memory, CPU cores, or other resources to a worker launch. Here is a pseudo-code sketch which increases available memory by 4 GB each time a worker crashes, then resets back to the original 16 GB when the worker succeeds: ```r launch_worker = function(call, name, launcher, worker, instance) { gigabytes_memory <- 16 + 4 * self$crashes(index = worker) submit_job(call, name, gigabytes_memory) } ``` This mechanism is similar to [retryable options in `crew.cluster`](https://wlandau.github.io/crew.cluster/reference/crew_options_slurm.html#retryable-options) and [in `crew.aws.batch`](https://wlandau.github.io/crew.aws.batch/reference/crew_options_aws_batch.html#retryable-options). # Helper It is useful to have a helper function that creates controllers with your custom launcher. It should: 1. Accept all the same arguments as [`crew_controller_local()`](https://wlandau.github.io/crew/reference/crew_controller_local.html). 2. Create a client object using [`crew_client()`](https://wlandau.github.io/crew/reference/crew_client.html). 3. Create a launcher object with the [`new()` method](https://wlandau.github.io/crew/reference/crew_class_launcher.html#method-crew_class_launcher-new) of your custom launcher class. 4. Create a new controller using [`crew_controller()`](https://wlandau.github.io/crew/reference/crew_controller.html). 5. Scan the controller for obvious errors using the [`validate()`](https://wlandau.github.io/crew/reference/crew_class_controller.html#method-crew_class_controller-validate) method of the controller. Feel free to borrow from the [`crew_controller_local()` source code](https://github.com/wlandau/crew/blob/main/R/crew_controller_local.R). For packages, you can use the `@inheritParams` [`roxygen2`](https://roxygen2.r-lib.org/) tag to inherit the documentation of all the arguments instead of writing it by hand. You may want to adjust the default arguments based on the specifics of your platform, especially `seconds_launch` if workers take a long time to launch. ```r #' @title Create a controller with the custom launcher. #' @export #' @description Create an `R6` object to submit tasks and #' launch workers. #' @inheritParams crew::crew_controller_local crew_controller_custom <- function( name = "custom controller name", workers = 1L, host = NULL, port = NULL, tls = crew::crew_tls(), seconds_interval = 0.5, seconds_timeout = 30, seconds_launch = 30, seconds_idle = Inf, seconds_wall = Inf, retry_tasks = TRUE, tasks_max = Inf, tasks_timers = 0L, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, crashes_error = 5L, r_arguments = NULL, options_metrics = crew::crew_options_metrics() ) { client <- crew::crew_client( name = name, workers = workers, host = host, port = port, tls = tls, seconds_interval = seconds_interval, seconds_timeout = seconds_timeout, retry_tasks = retry_tasks ) launcher <- custom_launcher_class$new( name = name, seconds_interval = seconds_interval, seconds_timeout = seconds_timeout, seconds_launch = seconds_launch, seconds_idle = seconds_idle, seconds_wall = seconds_wall, tasks_max = tasks_max, tasks_timers = tasks_timers, reset_globals = reset_globals, reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, crashes_error = crashes_error, tls = tls, r_arguments = r_arguments, options_metrics = options_metrics ) controller <- crew::crew_controller(client = client, launcher = launcher) controller$validate() controller } ``` # Informal testing Before you begin testing, please begin monitoring local processes and remote jobs on your platform. In the case of the above `crew` launcher which only creates local processes, it is sufficient to start [`htop`](https://htop.dev/) and filter for R processes, or launch a new R session to monitor the process table from [`ps::ps()`](https://ps.r-lib.org/reference/ps.html). However, for more ambitious launchers that submit workers to e.g. [AWS Batch](https://aws.amazon.com/batch/), you may need to open the [CloudWatch](https://aws.amazon.com/cloudwatch/) dashboard, then view the AWS billing dashboard after testing. When you are ready to begin testing, try out the example in the [README](https://wlandau.github.io/crew/index.html#usage), but use your your custom controller helper instead of [`crew_controller_local()`](https://wlandau.github.io/crew/reference/crew_controller_local.html). First, create and start a controller. You may wish to monitor local processes on your computer to make sure the `mirai` dispatcher starts. ```r library(crew) controller <- crew_controller_custom(workers = 2) controller$start() ``` Try pushing a task that gets the local IP address and process ID of the worker instance. ```r controller$push( name = "get worker IP address and process ID", command = paste(getip::getip(type = "local"), ps::ps_pid()) ) ``` Wait for the task to complete and look at the result. ```r controller$wait() result <- controller$pop() result$result[[1]] #> [1] "192.168.0.2 27336" ``` Please use the result to verify that the task really ran on a worker as intended. The process ID above should agree with the one from the handle ([except on Windows](https://github.com/r-lib/processx/issues/364) because the actual R process may be different from the `Rscript.exe` process created first). In addition, if the worker is running on a different computer, the worker IP address should be different than the local IP address. Since our custom launcher creates local processes, the IP addresses are the same in this case, but they should be different for a [SLURM](https://slurm.schedmd.com/) or [AWS Batch](https://aws.amazon.com/batch/) launcher. ```r getip::getip(type = "local") #> "192.168.0.2" controller$launcher$workers$handle[[1]]$get_pid() #> [1] 27336 ``` If you did not set any timeouts or task limits, the worker that ran the task should still be online. The other worker had no tasks, so it did not need to launch. ```r controller$client$summary() #> # A tibble: 2 Ɨ 6 #> worker online instances assigned complete socket #> #> 1 1 TRUE 1 1 1 ws://10.0.0.32:50258/1/571bcda7ā€¦ #> 2 2 FALSE 0 0 0 ws://10.0.0.32:50258/2/daf88d6eā€¦ ``` When you are done, terminate the controller. This terminates the `mirai` dispatcher process and the `crew` workers. ```r controller$terminate() ``` Finally, use the process monitoring interface of your computing platform or operating system to verify that all `mirai` dispatchers and `crew` workers are terminated. # Load testing If the informal testing succeeded, we recommend you scale up testing to more ambitious scenarios. As one example, you can test that your workers can auto-scale and quickly churn through a large number of tasks. ```r library(crew) controller <- crew_controller_custom( seconds_idle = 2L, workers = 2L ) controller$start() # Push 100 tasks for (index in seq_len(100L)) { name <- paste0("task_", index) controller$push(name = name, command = index, data = list(index = index)) message(paste("push", name)) } # Wait for the tasks to complete. controller$wait() # Wait for the workers to idle out and exit on their own. crew_retry( ~all(controller$client$summary()$online == FALSE), seconds_interval = 1, seconds_timeout = 60 ) # Do the same for 100 more tasks. for (index in (seq_len(100L) + 100L)) { name <- paste0("task_", index) controller$push(name = name, command = index, data = list(index = index)) message(paste("push", name)) } controller$wait() crew_retry( ~all(controller$client$summary()$online == FALSE), seconds_interval = 1, seconds_timeout = 60 ) # Collect the results. results <- NULL while (!is.null(result <- controller$pop(scale = FALSE))) { if (!is.null(result)) { results <- dplyr::bind_rows(results, result) } } # Check the results all(sort(unlist(results$result)) == seq_len(200L)) #> [1] TRUE # View worker and task summaries. controller$summary() controller$client$summary() controller$launcher$summary() # Terminate the controller. controller$terminate() # Now outside crew, verify that the mirai dispatcher # and crew workers successfully terminated. ``` # Asynchrony Depending on the launcher plugin, worker launches and terminations can be time-consuming. For example, each HTTP request to AWS Batch can take a couple seconds, and this latency becomes burdensome when it there are hundreds of workers. Fortunately, `crew` launchers can run launches and terminations asynchronously. As a launcher plugin developer, all you need to do is: 1. Expose the `processes` argument of `launcher$new()`. The `processes` field sets how many `mirai` daemons run locally and churn through quick requests. 2. Execute launches and terminations inside `self$async$eval()`, and return the resulting value from `launch_worker()` and `terminate_worker()`. Let's demonstrate on the simple `processx` example. The use case itself may silly because the workers are local `processx` processes, but the same principles apply if you replace `processx` with a cloud computing service like AWS Batch and you replace the process IDs with AWS Batch job IDs. Here is what the launcher class looks like. We work with `processx` PIDs directly because they are light and easy to send to local async `mirai` daemons. The `self$async$eval()` function accepts R code, data, and packages to run a quick local asynchronous task, and it returns a `mirai::mirai()` task object as the handle. `handle$data` returns the results if available, and `crew` uses `mirai::call_mirai()` to make sure any tasks submitted by `launch_worker()` have resolved before they are used by `terminate_worker()`. ```r async_launcher_class <- R6::R6Class( classname = "custom_launcher_class", inherit = crew::crew_class_launcher, public = list( launch_worker = function(call, name, launcher, worker, instance) { self$async$eval( command = list(pid = process$new(bin, args = c("-e", call))$get_pid()), data = list(bin = file.path(R.home("bin"), "R"), call = call), packages = "processx" ) }, terminate_worker = function(handle) { self$async$eval( command = crew::crew_terminate_process(handle$data$pid), data = list(pid = handle$data$pid) ) } ) ) ``` The controller helper includes a `processes` argument which sets how many asynchronous `mirai` daemons to create. Set `processes` to `NULL` to disable async and use it like an ordinary synchronous controller. ```r crew_controller_async <- function( name = "async controller name", workers = 1L, host = "127.0.0.1", port = NULL, tls = crew::crew_tls(mode = "none"), seconds_interval = 0.5, seconds_timeout = 30, seconds_launch = 30, seconds_idle = Inf, seconds_wall = Inf, tasks_max = Inf, tasks_timers = 0L, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, crashes_error = 5L, processes = NULL # Number of local async daemons for worker launches etc. ) { client <- crew::crew_client( name = name, workers = workers, host = host, port = port, tls = tls, seconds_interval = seconds_interval, seconds_timeout = seconds_timeout ) launcher <- async_launcher_class$new( name = name, seconds_interval = seconds_interval, seconds_launch = seconds_launch, seconds_idle = seconds_idle, seconds_wall = seconds_wall, tasks_max = tasks_max, tasks_timers = tasks_timers, reset_globals = reset_globals, reset_packages = reset_packages, reset_options = reset_options, garbage_collection = garbage_collection, crashes_error = crashes_error, tls = tls, processes = processes ) controller <- crew::crew_controller( client = client, launcher = launcher ) controller$validate() controller } ``` Creating a controller is the same as before, except the user sets both the `workers` and `processes` arguments. Remember, these are two different things: `workers` is the number of serious workers that run serious tasks from `push()`, whereas `processes` is the number of `mirai` daemons that asynchronously launch and terminate those serious workers. Workers may or may not be local, but `processes` are always local. ```r async_controller <- crew_controller_async(workers = 12, processes = 4) ``` `async_controller$start()` automatically launches 4 local processes to asynchronously auto-scale the workers, and `async_controller$terminate()` automatically shuts down those 4 processes. Beyond that, usage is the exactly same as before. # Managing workers Usually `crew` workers terminate themselves when the parent R session exits or the controller terminates, but under rare circumstances they may continue running. To help users of your plugin monitor and manually terminate workers, please consider implementing job management utilities to go with your launcher plugin. As described in the [introduction vignette](https://wlandau.github.io/crew/articles/introduction.html), `crew_monitor_local()` helps manually list and terminate local processes relevant to `crew`. Source code for the local monitor is [on GitHub](https://github.com/wlandau/crew/blob/main/R/crew_monitor_local.R), methods are [documented in the package website](https://wlandau.github.io/crew/reference/crew_class_monitor_local.html), and example usage is in the [introduction vignette](https://wlandau.github.io/crew/articles/introduction.html). In addition, [`crew_monitor_aws_batch()`](https://wlandau.github.io/crew.aws.batch/reference/crew_monitor_aws_batch.html) implements [several methods](https://wlandau.github.io/crew.aws.batch/reference/crew_class_monitor_aws_batch.html) for listing and terminating AWS Batch jobs, as well as viewing CloudWatch logs. The source code for the local monitor is copied below: ```r crew_monitor_local <- function() { crew_class_monitor_local$new() } crew_class_monitor_local <- R6::R6Class( classname = "crew_class_monitor_local", cloneable = FALSE, public = list( dispatchers = function() { crew_monitor_pids(pattern = "mirai::dispatcher") }, daemons = function() { crew_monitor_pids(pattern = "mirai::daemon") }, workers = function() { crew_monitor_pids(pattern = "crew::crew_worker") }, terminate = function(pids) { lapply(as.integer(pids), crew::crew_terminate_process) } ) ) crew_monitor_pids <- function(pattern) { processes <- ps::ps() commands <- map( processes$ps_handle, ~tryCatch(ps::ps_cmdline(.x), error = function(condition) "") ) filter <- grepl(pattern = pattern, x = as.character(commands), fixed = TRUE) as.integer(sort(processes$pid[filter])) } ``` Example usage: ```r monitor <- crew_monitor_local() monitor$dispatchers() # List PIDs of all local {mirai} dispatcher processes. #> [1] 31215 monitor$daemons() #> integer(0) monitor$workers() #> [1] 57001 57002 monitor$terminate(pid = c(57001, 57002)) monitor$workers() #> integer(0) ```