Installation
Install the clustermq
package in R from CRAN (including
the bundled ZeroMQ system
library):
install.packages('clustermq')
Alternatively you can use the remotes
package to install
directly from Github:
# install.packages('remotes')
remotes::install_github('mschubert/clustermq')
In the develop
branch, we will introduce code changes and new features. These may
contain bugs, poor documentation, or other inconveniences. This branch
may not install at times. However, feedback is
very welcome.
# install.packages('remotes')
remotes::install_github('mschubert/clustermq', ref="develop")
Configuration
Choose your preferred parallelism using:
options(clustermq.scheduler = "your scheduler here")
There are three kinds of schedulers:
-
LOCAL
- sequential processing of calls (default if no HPC scheduler found) - Multiple cores on the same machine
- HPC schedulers
Parallel and HPC schedulers can also be used via SSH.
Local parallelization
While this is not the main focus of the package, you can use it to parallelize function calls locally on multiple cores or processes. This can also be useful to test your code before submitting it to a scheduler.
- Multiprocess (recommended) - Use the
callr
package to run and manage multiple parallel R processes withoptions(clustermq.scheduler="multiprocess")
- Multicore - Uses the
parallel
package to fork the current R process into multiple threads withoptions(clustermq.scheduler="multicore")
. This sometimes causes problems (macOS, RStudio) and is not available on Windows.
Setting up the scheduler
An HPC cluster’s scheduler ensures that computing jobs are
distributed to available worker nodes. Hence, this is what
clustermq
interfaces with in order to do computations.
By default, we will take whichever scheduler we find and fall back on local processing. This will work in most, but not all cases.
To set up a scheduler explicitly, see the following links:
- LSF - should work without setup
- SGE - should work without setup
- SLURM - should work without setup
-
PBS/Torque - needs
options(clustermq.scheduler="PBS"/"Torque")
- if you want another scheduler, open an issue
Default submission templates are provided and can be customized, e.g. to activate compute environments or containers.
SSH connector
There are reasons why you might prefer to not to work on the computing cluster directly but rather on your local machine instead. RStudio is an excellent local IDE, it’s more responsive than and feature-rich than browser-based solutions (RStudio server, Project Jupyter), and it avoids X forwarding issues when you want to look at plots you just made.
Using this setup, however, you lost access to the computing cluster.
Instead, you had to copy your data there, and then submit individual
scripts as jobs, aggregating the data in the end again.
clustermq
is trying to solve this by providing a
transparent SSH interface.
In order to use clustermq
from your local machine, the
package needs to be installed on both there and on the computing
cluster. On the computing cluster, set up your scheduler and make sure
clustermq
runs there without problems. Note that the
remote scheduler can not be LOCAL
(default if no
HPC scheduler found) or SSH
for this to work.
# If this is set to 'LOCAL' or 'SSH' you will get the following error:
# Expected PROXY_READY, received ‘PROXY_ERROR: Remote SSH QSys is not allowed’
options(
clustermq.scheduler = "multiprocess" # or multicore, LSF, SGE, Slurm etc.
)
On your local machine, add the following options in your
~/.Rprofile
:
options(
clustermq.scheduler = "ssh",
clustermq.ssh.host = "user@host", # use your user and host, obviously
clustermq.ssh.log = "~/cmq_ssh.log" # log for easier debugging
)
We recommend that you set up SSH keys for password-less login.
Usage
The Q
function
The following arguments are supported by Q
:
-
fun
- The function to call. This needs to be self-sufficient (because it will not have access to themaster
environment) -
...
- All iterated arguments passed to the function. If there is more than one, all of them need to be named -
const
- A named list of non-iterated arguments passed tofun
-
export
- A named list of objects to export to the worker environment
Behavior can further be fine-tuned using the options below:
-
fail_on_error
- Whether to stop if one of the calls returns an error -
seed
- A common seed that is combined with job number for reproducible results -
memory
- Amount of memory to request for the job (bsub -M
) -
n_jobs
- Number of jobs to submit for all the function calls -
job_size
- Number of function calls per job. If used in combination withn_jobs
the latter will be overall limit -
chunk_size
- How many calls a worker should process before reporting back to the master. Default: every worker will report back 100 times total
The full documentation is available by typing ?Q
.
Examples
The package is designed to distribute arbitrary function calls on HPC worker nodes. There are, however, a couple of caveats to observe as the R session running on a worker does not share your local memory.
The simplest example is to a function call that is completely
self-sufficient, and there is one argument (x
) that we
iterate through:
fx = function(x) x * 2
Q(fx, x=1:3, n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 2
#>
#> [[2]]
#> [1] 4
#>
#> [[3]]
#> [1] 6
Non-iterated arguments are supported by the const
argument:
fx = function(x, y) x * 2 + y
Q(fx, x=1:3, const=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If a function relies on objects in its environment that are not
passed as arguments (including other functions), they can be exported
using the export
argument:
fx = function(x) x * 2 + y
Q(fx, x=1:3, export=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If we want to use a package function we need to load it on the worker
using the pkg
argument or referencing it with
package_name::
:
fx = function(x) {
x %>%
mutate(area = Sepal.Length * Sepal.Width) %>%
head()
}
Q(fx, x=list(iris), pkgs="dplyr", n_jobs=1)
#> Running sequentially ('LOCAL') ...
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
#> [[1]]
#> Sepal.Length Sepal.Width Petal.Length Petal.Width Species area
#> 1 5.1 3.5 1.4 0.2 setosa 17.85
#> 2 4.9 3.0 1.4 0.2 setosa 14.70
#> 3 4.7 3.2 1.3 0.2 setosa 15.04
#> 4 4.6 3.1 1.5 0.2 setosa 14.26
#> 5 5.0 3.6 1.4 0.2 setosa 18.00
#> 6 5.4 3.9 1.7 0.4 setosa 21.06
As parallel foreach
backend
The foreach
package provides an interface to perform repeated tasks on different
backends. While it can perform the function of simple loops using
%do%
:
it can also perform these operations in parallel using
%dopar%
:
x = foreach(i=1:3) %dopar% sqrt(i)
#> Warning: executing %dopar% sequentially: no parallel backend registered
The latter allows registering different handlers for parallel
execution, where we can use clustermq
:
# set up the scheduler first, otherwise this will run sequentially
clustermq::register_dopar_cmq(n_jobs=2, memory=1024) # this accepts same arguments as `Q`
x = foreach(i=1:3) %dopar% sqrt(i) # this will be executed as jobs
#> Running sequentially ('LOCAL') ...
As BiocParallel
supports foreach
too, this means we can run all packages
that use BiocParallel
on the cluster as well via
DoparParam
.
library(BiocParallel)
register(DoparParam()) # after register_dopar_cmq(...)
bplapply(1:3, sqrt)
With drake
The drake
package
enables users to define a dependency structure of different function
calls, and only evaluate them if the underlying data changed.
drake — or, Data Frames in R for Make — is a general-purpose workflow manager for data-driven tasks. It rebuilds intermediate data objects when their dependencies change, and it skips work when the results are already up to date. Not every runthrough starts from scratch, and completed workflows have tangible evidence of reproducibility. drake also supports scalability, parallel computing, and a smooth user experience when it comes to setting up, deploying, and maintaining data science projects.
It can use clustermq
to perform calculations as
jobs:
Options
The various configurable options are mentioned throughout the documentation, where applicable, however, we list all of the options here for reference.
Options can be set by including a call to
options(<key> = <value>)
in your
.Rprofile
, or by calling
options(<key> = <value>)
in a script or
interactively during a session.
-
clustermq.scheduler
- One of the supportedclustermq
schedulers; options are"LOCAL"
,"multiprocess"
,"multicore"
,"lsf"
,"sge"
,"slurm"
,"pbs"
,"Torque"
, or"ssh"
(default is the HPC scheduler found in$PATH
, otherwise"LOCAL"
) -
clustermq.host
- The name of the node or device for constructing theZeroMQ
host address (default isSys.info()["nodename"]
) -
clustermq.ssh.host
- The user name and host for connecting to the HPC via SSH (e.g.user@host
); we recommend setting up SSH keys for password-less login -
clustermq.ssh.log
- Path for a file (on the SSH host) that will be created and populated with logging information regarding the SSH connection (e.g."~/cmq_ssh.log"
); helpful for debugging purposes -
clustermq.ssh.timeout
- The amount of time to wait (in seconds) for a SSH start-up connection before timing out (default is 5 seconds) -
clustermq.worker.timeout
- The amount of time to wait (in seconds) for master-worker communication before timing out (default is 600 seconds) -
clustermq.error.timeout
- The amount of time to wait (in seconds), in case of a worker error, for the remaining workers to finish their computations and shut down cleanly (default ismin(timeout, 30)
seconds) -
clustermq.template
- Path to a template file for submitting HPC jobs; only necessary if using your own template, otherwise the default template will be used (default depends onclustermq.scheduler
) -
clustermq.data.warning
- The threshold for the size of the common data (in Mb) beforeclustermq
throws a warning (default is 1000) -
clustermq.defaults
- A named-list of default values for the HPC template; this takes precedence over defaults specified in the template file (default is an empty list (i.e.list()
))
Troubleshooting
Debugging workers
Function calls evaluated by workers are wrapped in event handlers, which means that even if a call evaluation throws an error, this should be reported back to the main R session.
However, there are reasons why workers might crash, and in which case they can not report back. These include:
- A segfault in a low-level process
- Process kill due to resource constraints (e.g. walltime)
- Reaching the wait timeout without any signal from the master process
- Probably others
In this case, it is useful to have the worker(s) create a log file that will also include events that are not reported back. It can be requested using:
Q(..., log_worker=TRUE)
This will create a file called
You can customize the file name using
Note that in this case log_file
is a template field of
your scheduler script, and hence needs to be present there in order for
this to work. The default templates all have this field included.
In order to log each worker separately, some schedulers support wildcards in their log file names. For instance:
- Multicore/Multiprocess:
log_file="/path/to.file.%i"
- SGE:
log_file="/path/to.file.\$TASK_ID"
- LSF:
log_file="/path/to.file.%I"
- Slurm:
log_file="/path/to.file.%a"
- PBS:
log_file="/path/to.file.$PBS_ARRAY_INDEX"
- Torque:
log_file="/path/to.file.$PBS_ARRAYID"
Your scheduler documentation will have more details about the available options.
When reporting a bug that includes worker crashes, please always include a log file.
SSH
Before trying remote schedulers via SSH, make sure that the scheduler works when you first connect to the cluster and run a job from there.
If the terminal is stuck at
Connecting <user@host> via SSH ...
make sure that each step of your SSH connection works by typing the following commands in your local terminal and make sure that you don’t get errors or warnings in each step:
# test your ssh login that you set up in ~/.ssh/config
# if this fails you have not set up SSH correctly
ssh <user@host>
# test port forwarding from 54709 remote to 6687 local (ports are random)
# if the fails you will not be able to use clustermq via SSH
ssh -R 54709:localhost:6687 <user@host> R --vanilla
If you get an Command not found: R
error, make sure your
$PATH
is set up correctly in your
~/.bash_profile
and/or your ~/.bashrc
(depending on your cluster config you might need either). You may also
need to modify your SSH template to load R
as a module or conda environment.
If you get a SSH warning or error try again with ssh -v
to enable verbose output. If the forward itself works, run the following
in your local R session (ideally also in command-line R, not only in
RStudio):
options(clustermq.scheduler = "ssh",
clustermq.ssh.log = "~/ssh_proxy.log")
Q(identity, x=1, n_jobs=1)
This will create a log file on the remote server that will
contain any errors that might have occurred during
ssh_proxy
startup.
If the ssh_proxy
startup fails on your local machine
with the error
Remote R process did not respond after 5 seconds. Check your SSH server log.
but the server log does not show any errors, then you can try increasing the timeout:
options(clustermq.ssh.timeout = 30) # in seconds
This can happen when your SSH startup template includes additional steps before starting R, such as activating a module or conda environment, or having to confirm the connection via two-factor authentication.
Environments
Environments for workers
In some cases, it may be necessary to activate a specific computing environment on the scheduler jobs prior to starting up the worker. This can be, for instance, because R was only installed in a specific environment or container.
Examples for such environments or containers are:
- Bash module environments
- Conda environments
- Docker/Singularity containers
It should be possible to activate them in the job submission script (i.e., the template file). This is widely untested, but would look the following for the LSF scheduler (analogous for others):
#BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs
#BSUB-o {{ log_file | /dev/null }} # stdout + stderr
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)
module load {{ bashenv | default_bash_env }}
# or: source activate {{ conda | default_conda_env_name }}
# or: your environment activation command
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
This template still needs to be filled, so in the above example you need to pass either
or set it via an .Rprofile option:
Running master inside containers
If your master process is inside a container, accessing the HPC scheduler is more difficult. Containers, including singularity and docker, isolate the processes inside the container from the host. The R process will not be able to submit a job because the scheduler cannot be found.
Note that the HPC node running the master process must be allowed to submit jobs. Not all HPC systems allow compute nodes to submit jobs. If that is the case, you may need to run the master process on the login node, and discuss the issue with your system administrator.
If your container is binary compatible with the host, you may be able to bind in the scheduler executable to the container.
For example, PBS might look something like:
#PBS directives ...
module load singularity
SINGULARITYENV_APPEND_PATH=/opt/pbs/bin
singularity exec --bind /opt/pbs/bin r_image.sif Rscript master_script.R
A working example of binding SLURM into a CentOS 7 container image from a CentOS 7 host is available at https://groups.google.com/a/lbl.gov/d/msg/singularity/syLcsIWWzdo/NZvF2Ud2AAAJ
Alternatively, you can create a script that uses SSH to execute the
scheduler on the login node. For this, you will need an SSH client in
the container, keys
set up for password-less login, and create a script to call the
scheduler on the login node via ssh (e.g. ~/bin/qsub
for
SGE/PBS/Torque, bsub
for LSF and sbatch
for
Slurm):
Make sure the script is executable, and bind/copy it into the
container somewhere on $PATH
. Home directories are bound in
by default in singularity.
Scheduler templates
LSF
In your ~/.Rprofile
on your computing cluster, set the
following options:
options(
clustermq.scheduler = "lsf",
clustermq.template = "/path/to/file/below" # if using your own template
)
The option clustermq.template
should point to a LSF
template file like the one below (only needed if you want to supply your
own template rather than using the default).
#BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs
#BSUB-n {{ cores | 1 }} # number of cores to use per job
#BSUB-o {{ log_file | /dev/null }} # stdout + stderr; %I for array index
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #BSUB-*
defines command-line arguments to
the bsub
program.
- Memory: defined by
BSUB-M
andBSUB-R
. Check your local setup if the memory values supplied are MiB or KiB, default is4096
if not requesting memory when callingQ()
- Queue:
BSUB-q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional#
) - Walltime:
BSUB-W {{ walltime }}
. Set the maximum time a job is allowed to run before being killed. The default here is to disable this line. If you enable it, enter a fixed value or pass thewalltime
argument to each function call. The way it is written, it will use 6 hours if no arguemnt is given. - For other options, see the
LSF documentation and add them via
#BSUB-*
(where*
represents the argument) - Do not change the identifiers in curly braces
(
{{ ... }}
), as they are used to fill in the right variables
Once this is done, the package will use your settings and no longer warn you of the missing options.
SGE
In your ~/.Rprofile
on your computing cluster, set the
following options:
options(
clustermq.scheduler = "sge",
clustermq.template = "/path/to/file/below" # if using your own template
)
The option clustermq.template
should point to a SGE
template file like the one below (only needed if you want to supply your
own template rather than using the default).
#$ -N {{ job_name }} # job name
#$ -q default # submit to queue named "default"
#$ -j y # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }} # output file
#$ -cwd # use pwd as work dir
#$ -V # use environment variable
#$ -t 1-{{ n_jobs }} # submit jobs as array
#$ -pe {{ cores | 1 }} # number of cores to use per job
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #$-*
defines command-line arguments to the
qsub
program.
- Queue:
$ -q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional#
) - For other options, see the
SGE documentation. Do not change the identifiers in curly braces
(
{{ ... }}
), as they are used to fill in the right variables.
Once this is done, the package will use your settings and no longer warn you of the missing options.
SLURM
In your ~/.Rprofile
on your computing cluster, set the
following options:
options(
clustermq.scheduler = "slurm",
clustermq.template = "/path/to/file/below" # if using your own template
)
The option clustermq.template
should point to a SLURM
template file like the one below (only needed if you want to supply your
own template rather than using the default).
#!/bin/sh
#SBATCH --job-name={{ job_name }}
#SBATCH --partition=default
#SBATCH --output={{ log_file | /dev/null }} # you can add .%a for array index
#SBATCH --error={{ log_file | /dev/null }}
#SBATCH --mem-per-cpu={{ memory | 4096 }}
#SBATCH --array=1-{{ n_jobs }}
#SBATCH --cpus-per-task={{ cores | 1 }}
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #SBATCH
defines command-line arguments to
the sbatch
program.
- Queue:
SBATCH --partition default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional#
) - For other options, see the SLURM
documentation. Do not change the identifiers in curly braces
(
{{ ... }}
), as they are used to fill in the right variables.
Once this is done, the package will use your settings and no longer warn you of the missing options.
PBS
In your ~/.Rprofile
on your computing cluster, set the
following options:
options(
clustermq.scheduler = "pbs",
clustermq.template = "/path/to/file/below" # if using your own template
)
The option clustermq.template
should point to a PBS
template file like the one below (only needed if you want to supply your
own template rather than using the default).
#PBS -N {{ job_name }}
#PBS -J 1-{{ n_jobs }}
#PBS -l select=1:ncpus={{ cores | 1 }}:mpiprocs={{ cores | 1 }}:mem={{ memory | 4096 }}MB
#PBS -l walltime={{ walltime | 12:00:00 }}
#PBS -o {{ log_file | /dev/null }}
#PBS -j oe
#PBS -q default
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #PBS-*
defines command-line arguments to
the qsub
program.
- Queue:
#PBS-q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional#
) - For other options, see the PBS documentation. Do not change the
identifiers in curly braces (
{{ ... }}
), as they are used to fill in the right variables.
Once this is done, the package will use your settings and no longer warn you of the missing options.
Torque
In your ~/.Rprofile
on your computing cluster, set the
following options:
options(clustermq.scheduler = "Torque",
clustermq.template = "/path/to/file/below" # if using your own template
)
The option clustermq.template
should point to a Torque
template file like the one below (only needed if you want to supply your
own template rather than using the default).
#PBS -N {{ job_name }}
#PBS -l nodes={{ n_jobs }}:ppn={{ cores | 1 }},walltime={{ walltime | 12:00:00 }}
#PBS -o {{ log_file | /dev/null }}
#PBS -q default
#PBS -j oe
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #PBS-*
defines command-line arguments to
the qsub
program.
- Queue:
#PBS -q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional#
) - For other options, see the Torque documentation. Do not change the
identifiers in curly braces (
{{ ... }}
), as they are used to fill in the right variables.
Once this is done, the package will use your settings and no longer warn you of the missing options.
SSH
While SSH is not a scheduler, we can access remote schedulers via SSH. If you want to use it, first make sure that your real scheduler is running when manually connecting to the HPC environment.
options(clustermq.scheduler = "ssh",
clustermq.ssh.host = "myhost", # set this up in your local ~/.ssh/config
clustermq.ssh.log = "~/ssh_proxy.log", # log file on your HPC
clustermq.ssh.timeout = 30, # if changing the default connection timeout
clustermq.template = "/path/to/file/below" # if using your own template
)
The default template is shown below. If R
is not in your
HPC $PATH
, you may need to specify its path or load the
required bash modules/conda environments.