Change points in a time course
In order to not only use Sys.sleep()
but do some actual
computations, we chose as a toy problem the inference of change points
in a time course with repeated observations of the same measurement.
First, we will start off with simulating data that has an increasing
integer x
coordinate (time point) and a continuous
y
coordinate (observation value).
We will then use the mcp
package to detect a point in
x
where the value of y
changes abruptly. This
package is able to infer many different kinds of change points. Here, we
make use of changing intercepts with constant, normally distributed
noise. The package is built on rjags
, which uses a Gibbs
sampling Markov-Chain Monte Carlo (MCMC).
These computations are usually computationally expensive, but the individual sampling chains are independent of each other. This makes this kind of problem amenable to extensive parallelization.
Fortunately, the rjags
package is already provided as a
module, so we can load it using the following command:
module load rjags/4-10-R-4.1.2
After running this, it should be available from within R via
library(rjags)
. We still need to
install.packages("mcp")
, but as this does not require
external tools it should work without issues. Note that the system-wide
R package library is not writable for individual users, so you have to
use your user library in your home directory (R will prompt you to do
this).
Creating a function to simulate data
We can use the following function to simulate our data:
# simulate one data set with random break points and y coordinates
simulate_data = function(len=1000, bps=c(0,1,2)) {
sim = data.frame(x=seq_len(len), y=rnorm(len))
for (bp in seq_len(sample(bps, 1))) {
loc = sample(seq_len(len), 1)
sim$y[loc:len] = sim$y[loc:len] + rnorm(1)
}
sim
}
This will randomly generate one sample with our x
and
y
coordinates. The resulting data should look something
like this:
sim = simulate_data(bps=1)
plot(sim)
# dev.off() # close the device to complete the plot
On Sulis, the plot will be saved as Rplots.pdf
.
Running break point inference
We can now use our simulated data as a starting point to infer the
breakpoints that we simulated. Using the mcp
package, we
run three different models:
- One segment, no breakpoints
- Two segments, one breakpoint
- Three segments, two breakpoints
When then compare these models in terms of how well they fit the data, and return the model that fits best:
library(mcp)
run_mcp = function(sim) {
# one unbroken segment, 2 segments with one break point, 3 segments 2 bps
mods = list(list(y ~ 1),
list(y ~ 1, ~ 1),
list(y ~ 1, ~ 1, ~ 1))
# fit all three models, select the best using leave-one-out
fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
best = as.integer(sub("model", "", rownames(compare)))[1]
fits[[best]]
}
The resulting model should look something like this:
mod = run_mcp(sim)
#> Compiling model graph
#> Resolving undeclared variables
#> Allocating nodes
#> Graph information:
#> Observed stochastic nodes: 1000
#> Unobserved stochastic nodes: 2
#> Total graph size: 5019
#>
#> Initializing model
#> Finished sampling in 18.3 seconds
#> Compiling model graph
#> Resolving undeclared variables
#> Allocating nodes
#> Graph information:
#> Observed stochastic nodes: 1000
#> Unobserved stochastic nodes: 4
#> Total graph size: 12020
#>
#> Initializing model
#> Finished sampling in 79.2 seconds
#> Compiling model graph
#> Resolving undeclared variables
#> Allocating nodes
#> Graph information:
#> Observed stochastic nodes: 1000
#> Unobserved stochastic nodes: 6
#> Total graph size: 17028
#>
#> Initializing model
#> Finished sampling in 146.4 seconds
#> Warning: Some Pareto k diagnostic values are too high. See help('pareto-k-diagnostic') for details.
#> Warning: Some Pareto k diagnostic values are too high. See help('pareto-k-diagnostic') for details.
plot(mod)
# dev.off() # close the device to complete the plot
Here, we see the x
and y
coordinates of our
observations as before, but in addition we see the traces of the
intercepts (grey lines) and the probability density of the break point
itself (blue line close to the bottom of the plot).
Exercise
- Use your editor to create the break point simulation script
(hint: you can use Esc+
:set paste
in nvim to copy-paste the text without automatic indentation and:set nopaste
to return) - Start an interactive job with 1 task and 5 cores
- Simulate data for one and two break points
- Run the
mcp
inference script to try to estimate the breakpoints from the data. Do they match to the parameters of your simulation? (hint: you can use theplot
function on amcp
model object; this will create aRplots.pdf
after which you’ll need todev.off()
and copy to your local machine) - Which options do you see to make this code run faster?
(hint: there is both
mcp
-provided parallelism and sequential steps in the inference code) - How much runtime can you save by running computations in parallel?
Solution
Start the interactive job using:
srun --account su105 --ntasks=1 --cpus-per-task=5 --time 6:00:00 --pty $SHELL
In nvim
create the file compute_breakpt.r:
library(mcp)
simulate_data = function(len=1000, bps=c(0,1,2)) {
sim = data.frame(x=seq_len(len), y=rnorm(len))
for (bp in seq_len(sample(bps, 1))) {
loc = sample(seq_len(len), 1)
sim$y[loc:len] = sim$y[loc:len] + rnorm(1)
}
sim
}
run_mcp = function(sim) {
# one unbroken segment, 2 segments with one break point, 3 segments 2 bps
mods = list(list(y ~ 1),
list(y ~ 1, ~ 1),
list(y ~ 1, ~ 1, ~ 1))
# fit all three models, select the best using leave-one-out
fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
best = as.integer(sub("model", "", rownames(compare)))[1]
fits[[best]]
}
run_mcp_fast = function(sim) {
# one unbroken segment, 2 segments with one break point, 3 segments 2 bps
mods = list(list(y ~ 1),
list(y ~ 1, ~ 1),
list(y ~ 1, ~ 1, ~ 1))
# fit all three models, select the best using leave-one-out
fits = parallel::mclapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x", cores=3L))
# ^^^^^^^^^^^^^^^^^^ ^^^^^^^^
compare = as.data.frame(loo::loo_compare(parallel::mclapply(fits, mcp::loo)))
# ^^^^^^^^^^^^^^^^^^
best = as.integer(sub("model", "", rownames(compare)))[1]
fits[[best]]
}
one_bp = simulate_data(bps=1)
two_bp = simulate_data(bps=2)
system.time({ r1 = run_mcp(one_bp) })
system.time({ r2 = run_mcp_fast(one_bp) })
system.time({ r3 = run_mcp(two_bp) })
system.time({ r4 = run_mcp_fast(two_bp) })
pdf("breakpts.pdf")
plot(r1)
plot(r2)
plot(r3)
plot(r4)
dev.off()
We also see that run_mcp_fast
is not 9 times as fast,
but gets slowed down by (1) higher run times for more complex models,
and (2) only the 5 cores we requested.
In general, we want to avoid “oversubscribing” the cores we have requested because they will ultimately run slower than if we use what we request.
Bigger data sets
The strategy of reserving a node with many CPUs works well up to the extent where we want to process more computations in parallel than there are CPUs on a given node (modern nodes often have up to 128 cores/threads).
Here, we want to simulate such a big computational task, but for this to not use too many resources we will limit the overall amount.
Exercise
- Simulate 10 breakpoint data sets using the function above, and save
the resulting list in an
.rds
object (usingsaveRDS
) - Write a submission script with 10 tasks that load the object, subset
the current task index, and save the resulting model as
.rds
and model plot as.pdf
(hint: there are automatic environment variables available for each Slurm run, such asSLURM_PROCID
)
Solution simulation
sim_breakpt.r
simulate_data = function(len=1000, bps=c(0,1,2)) {
sim = data.frame(x=seq_len(len), y=rnorm(len))
for (bp in seq_len(sample(bps, 1))) {
loc = sample(seq_len(len), 1)
sim$y[loc:len] = sim$y[loc:len] + rnorm(1)
}
sim
}
sims = replicate(10, simulate_data(), simplify=FALSE)
saveRDS(sims, file="sim_breakpt.rds")
Solution breakpoint inference
submit_breakpt.sh
#!/bin/sh
#SBATCH --account su105
#SBATCH --partition compute
#SBATCH --ntasks 10
#SBATCH --cpus-per-task 1
#SBATCH --mem 2048M
#SBATCH --time 8:00:00
srun Rscript compute_breakpt_manual.r
compute_breakpt_manual.r
library(mcp)
run_mcp = function(sim) {
# one unbroken segment, 2 segments with one break point, 3 segments 2 bps
mods = list(list(y ~ 1),
list(y ~ 1, ~ 1),
list(y ~ 1, ~ 1, ~ 1))
# fit all three models, select the best using leave-one-out
fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
best = as.integer(sub("model", "", rownames(compare)))[1]
fits[[best]]
}
idx = Sys.getenv("SLURM_PROCID") # zero-indexed
dset = readRDS("sim_breakpt.rds")
cur_dset = dset[[as.integer(idx) + 1]] # one-indexed
res = run_mcp(cur_dset)
pdf(paste0("bp", idx, ".pdf"))
dev.off()
saveRDS(res, file=paste0("bp", idx, ".rds"))
HPC-specific packages
There are also multiple packages available to make use of HPC resources from within R. That is to say, that R will submit a job or multiple jobs, and retrieve the result back to the session.
There are, for instance, the packages BatchJobs
and
batchtools
. These make use of the networked file system to
write each call and its arguments to a file, that is then retrieved by
the job and executed. These packages are robust for small numbers of
jobs, however, they will put a substantial strain on the file system for
a high number of function calls.
Instead, we will introduce two packages that make only little use of
the shared file system, slurmR
and clustermq
.
(Note that I am the author of the latter, so there’s a bit of a conflict
of interest here.)
There is also the Rmpi
package, which provides an R cluster object for
e.g. parLapply
of multiple instances communicating over
different nodes. There is an example on the Sulis
docs, however, it requires additional setup in the job submission
script.
slurmR
The slurmR
package is a lightweight (dependency-free) R package that allows users
to interact with the scheduler.
It can be used to create a cluster object on Slurm tasks analogous to
parallel::makePSOCKcluster
. We do not need a
separate submission script, but should run it in an interactive job to
not strain the login node. The command for this is
slurmR::makeSlurmCluster(ntasks)
, which can then be used
with the parLapply
or parSapply
functions:
# install.packages("slurmR") if you have not installed the package yet
library(slurmR)
opts_slurmR$set_preamble("module load rjags/4-10-R-4.1.2")
cl = makeSlurmCluster(5, account="su105")
system.time({ parSapply(cl, 1:10, function(i) Sys.sleep(1)) })
stopCluster(cl)
The sbatch
call that is created via
makeSlurmCluster
can be customized by passing different
parameters to the cluster creation function. In particular, we need
to:
- Include the right budgeting account
More details can be found in their Getting Started vignette.
clustermq
The clustermq
package provides an interface to multiple HPC schedulers (including
Slurm) via the ZeroMQ socket library, available as a module:
module load GCCcore/11.2.0 ZeroMQ/4.3.4
# install.packages("clustermq") if you have not installed the package yet
library(clustermq)
Q(function(i) Sys.sleep(1), i=1:10, n_jobs=5)
The package relies on a submission template, which by default submits Slurm jobs as a job array. As we need to supply an account name, and the policy on Sulis is to use tasks whenever possible, we should modify the submission template to:
- Include the right budgeting account
- Use
srun
with multiple tasks per index in the job array this is not actually working (yet)
To address both points, we can create a new file
cmq_slurm.tmpl
with the following contents:
#!/bin/sh
#SBATCH --account={{ account | su105 }}
#SBATCH --job-name={{ job_name }}
#SBATCH --ntasks={{ n_jobs }}
#SBATCH --mem-per-cpu={{ memory | 1024M }}
#SBATCH --cpus-per-task={{ cores | 1 }}
module load rjags/4-10-R-4.1.2 # this we will need for our example
CMQ_AUTH={{ auth }} srun R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
We can then use this newly created template by:
options(clustermq.host = "ib0", # faster network
clustermq.template = "/path/to/cmq_slurm.tmpl")
We can either run these lines in an active R session, or add them to
~/.Rprofile
to set automatically each time R is
started.
This should make the above example run within tasks instead of an
array job. More information on how to configure and use
clustermq
is available at the User
Guide vignette.
Exercise
- Run the above
mcp
example using one interactive “master” job (e.g. using the R integration withnvim
) and- Run the workers with
slurmR
- Run the workers with
clustermq
- Run the workers with
Solution slurmR
If not in an interactive job yet, start it using
srun --account su105 --ntasks=1 --cpus-per-task=1 --time 6:00:00 --pty $SHELL
In nvim
create the file
compute_breakpt_slurmR.r:
library(slurmR)
run_mcp = function(sim) {
library(mcp)
# one unbroken segment, 2 segments with one break point, 3 segments 2 bps
mods = list(list(y ~ 1),
list(y ~ 1, ~ 1),
list(y ~ 1, ~ 1, ~ 1))
# fit all three models, select the best using leave-one-out
fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
best = as.integer(sub("model", "", rownames(compare)))[1]
fits[[best]]
}
dset = readRDS("sim_breakpt.rds")
cl = makeSlurmCluster(10, account="su105")
res = parLapply(cl, dset, run_mcp)
stopCluster(cl)
saveRDS(res, file="breakpt_slurmR.rds")
Solution clustermq
If not in an interactive job yet, start it using
srun --account su105 --ntasks=1 --cpus-per-task=1 --time 6:00:00 --pty $SHELL
In nvim
create the file
compute_breakpt_clustermq.r:
library(clustermq)
run_mcp = function(sim) {
# one unbroken segment, 2 segments with one break point, 3 segments 2 bps
mods = list(list(y ~ 1),
list(y ~ 1, ~ 1),
list(y ~ 1, ~ 1, ~ 1))
# fit all three models, select the best using leave-one-out
fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
best = as.integer(sub("model", "", rownames(compare)))[1]
fits[[best]]
}
dset = readRDS("sim_breakpt.rds")
res = Q(run_mcp, sim=dset, pkgs="mcp", n_jobs=10)
saveRDS(res, file="breakpt_clustermq.rds")