Skip to contents

Change points in a time course

In order to not only use Sys.sleep() but do some actual computations, we chose as a toy problem the inference of change points in a time course with repeated observations of the same measurement. First, we will start off with simulating data that has an increasing integer x coordinate (time point) and a continuous y coordinate (observation value).

We will then use the mcp package to detect a point in x where the value of y changes abruptly. This package is able to infer many different kinds of change points. Here, we make use of changing intercepts with constant, normally distributed noise. The package is built on rjags, which uses a Gibbs sampling Markov-Chain Monte Carlo (MCMC).

These computations are usually computationally expensive, but the individual sampling chains are independent of each other. This makes this kind of problem amenable to extensive parallelization.

Fortunately, the rjags package is already provided as a module, so we can load it using the following command:

module load rjags/4-10-R-4.1.2

After running this, it should be available from within R via library(rjags). We still need to install.packages("mcp"), but as this does not require external tools it should work without issues. Note that the system-wide R package library is not writable for individual users, so you have to use your user library in your home directory (R will prompt you to do this).

Creating a function to simulate data

We can use the following function to simulate our data:

# simulate one data set with random break points and y coordinates
simulate_data = function(len=1000, bps=c(0,1,2)) {
    sim = data.frame(x=seq_len(len), y=rnorm(len))
    for (bp in seq_len(sample(bps, 1))) {
        loc = sample(seq_len(len), 1)
        sim$y[loc:len] = sim$y[loc:len] + rnorm(1)
    }
    sim
}

This will randomly generate one sample with our x and y coordinates. The resulting data should look something like this:

sim = simulate_data(bps=1)
plot(sim)

# dev.off()  # close the device to complete the plot

On Sulis, the plot will be saved as Rplots.pdf.

Running break point inference

We can now use our simulated data as a starting point to infer the breakpoints that we simulated. Using the mcp package, we run three different models:

  • One segment, no breakpoints
  • Two segments, one breakpoint
  • Three segments, two breakpoints

When then compare these models in terms of how well they fit the data, and return the model that fits best:

library(mcp)

run_mcp = function(sim) {
    # one unbroken segment, 2 segments with one break point, 3 segments 2 bps
    mods = list(list(y ~ 1),
                list(y ~ 1, ~ 1),
                list(y ~ 1, ~ 1, ~ 1))

    # fit all three models, select the best using leave-one-out
    fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
    compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
    best = as.integer(sub("model", "", rownames(compare)))[1]
    fits[[best]]
}

The resulting model should look something like this:

mod = run_mcp(sim)
#> Compiling model graph
#>    Resolving undeclared variables
#>    Allocating nodes
#> Graph information:
#>    Observed stochastic nodes: 1000
#>    Unobserved stochastic nodes: 2
#>    Total graph size: 5019
#> 
#> Initializing model
#> Finished sampling in 18.3 seconds
#> Compiling model graph
#>    Resolving undeclared variables
#>    Allocating nodes
#> Graph information:
#>    Observed stochastic nodes: 1000
#>    Unobserved stochastic nodes: 4
#>    Total graph size: 12020
#> 
#> Initializing model
#> Finished sampling in 79.2 seconds
#> Compiling model graph
#>    Resolving undeclared variables
#>    Allocating nodes
#> Graph information:
#>    Observed stochastic nodes: 1000
#>    Unobserved stochastic nodes: 6
#>    Total graph size: 17028
#> 
#> Initializing model
#> Finished sampling in 146.4 seconds
#> Warning: Some Pareto k diagnostic values are too high. See help('pareto-k-diagnostic') for details.

#> Warning: Some Pareto k diagnostic values are too high. See help('pareto-k-diagnostic') for details.
plot(mod)

# dev.off()  # close the device to complete the plot

Here, we see the x and y coordinates of our observations as before, but in addition we see the traces of the intercepts (grey lines) and the probability density of the break point itself (blue line close to the bottom of the plot).

Exercise

  • Use your editor to create the break point simulation script (hint: you can use Esc+:set paste in nvim to copy-paste the text without automatic indentation and :set nopaste to return)
  • Start an interactive job with 1 task and 5 cores
  • Simulate data for one and two break points
  • Run the mcp inference script to try to estimate the breakpoints from the data. Do they match to the parameters of your simulation? (hint: you can use the plot function on a mcp model object; this will create a Rplots.pdf after which you’ll need to dev.off() and copy to your local machine)
  • Which options do you see to make this code run faster? (hint: there is both mcp-provided parallelism and sequential steps in the inference code)
  • How much runtime can you save by running computations in parallel?
Solution

Start the interactive job using:

srun --account su105 --ntasks=1 --cpus-per-task=5 --time 6:00:00 --pty $SHELL

In nvim create the file compute_breakpt.r:

library(mcp)
simulate_data = function(len=1000, bps=c(0,1,2)) {
    sim = data.frame(x=seq_len(len), y=rnorm(len))
    for (bp in seq_len(sample(bps, 1))) {
        loc = sample(seq_len(len), 1)
        sim$y[loc:len] = sim$y[loc:len] + rnorm(1)
    }
    sim
}

run_mcp = function(sim) {
    # one unbroken segment, 2 segments with one break point, 3 segments 2 bps
    mods = list(list(y ~ 1),
                list(y ~ 1, ~ 1),
                list(y ~ 1, ~ 1, ~ 1))

    # fit all three models, select the best using leave-one-out
    fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
    compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
    best = as.integer(sub("model", "", rownames(compare)))[1]
    fits[[best]]
}

run_mcp_fast = function(sim) {
    # one unbroken segment, 2 segments with one break point, 3 segments 2 bps
    mods = list(list(y ~ 1),
                list(y ~ 1, ~ 1),
                list(y ~ 1, ~ 1, ~ 1))

    # fit all three models, select the best using leave-one-out
    fits = parallel::mclapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x", cores=3L))
#          ^^^^^^^^^^^^^^^^^^                                                    ^^^^^^^^
    compare = as.data.frame(loo::loo_compare(parallel::mclapply(fits, mcp::loo)))
#                                            ^^^^^^^^^^^^^^^^^^
    best = as.integer(sub("model", "", rownames(compare)))[1]
    fits[[best]]
}

one_bp = simulate_data(bps=1)
two_bp = simulate_data(bps=2)

system.time({ r1 = run_mcp(one_bp) })
system.time({ r2 = run_mcp_fast(one_bp) })

system.time({ r3 = run_mcp(two_bp) })
system.time({ r4 = run_mcp_fast(two_bp) })

pdf("breakpts.pdf")
plot(r1)
plot(r2)
plot(r3)
plot(r4)
dev.off()

We also see that run_mcp_fast is not 9 times as fast, but gets slowed down by (1) higher run times for more complex models, and (2) only the 5 cores we requested.

In general, we want to avoid “oversubscribing” the cores we have requested because they will ultimately run slower than if we use what we request.

Bigger data sets

The strategy of reserving a node with many CPUs works well up to the extent where we want to process more computations in parallel than there are CPUs on a given node (modern nodes often have up to 128 cores/threads).

Here, we want to simulate such a big computational task, but for this to not use too many resources we will limit the overall amount.

Exercise

  • Simulate 10 breakpoint data sets using the function above, and save the resulting list in an .rds object (using saveRDS)
  • Write a submission script with 10 tasks that load the object, subset the current task index, and save the resulting model as .rds and model plot as .pdf (hint: there are automatic environment variables available for each Slurm run, such as SLURM_PROCID)
Solution simulation

sim_breakpt.r

simulate_data = function(len=1000, bps=c(0,1,2)) {
    sim = data.frame(x=seq_len(len), y=rnorm(len))
    for (bp in seq_len(sample(bps, 1))) {
        loc = sample(seq_len(len), 1)
        sim$y[loc:len] = sim$y[loc:len] + rnorm(1)
    }
    sim
}

sims = replicate(10, simulate_data(), simplify=FALSE)
saveRDS(sims, file="sim_breakpt.rds")
Solution breakpoint inference

submit_breakpt.sh

#!/bin/sh
#SBATCH --account su105
#SBATCH --partition compute
#SBATCH --ntasks 10
#SBATCH --cpus-per-task 1
#SBATCH --mem 2048M
#SBATCH --time 8:00:00

srun Rscript compute_breakpt_manual.r

compute_breakpt_manual.r

library(mcp)

run_mcp = function(sim) {
    # one unbroken segment, 2 segments with one break point, 3 segments 2 bps
    mods = list(list(y ~ 1),
                list(y ~ 1, ~ 1),
                list(y ~ 1, ~ 1, ~ 1))

    # fit all three models, select the best using leave-one-out
    fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
    compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
    best = as.integer(sub("model", "", rownames(compare)))[1]
    fits[[best]]
}

idx = Sys.getenv("SLURM_PROCID") # zero-indexed

dset = readRDS("sim_breakpt.rds")
cur_dset = dset[[as.integer(idx) + 1]] # one-indexed

res = run_mcp(cur_dset)
pdf(paste0("bp", idx, ".pdf"))
dev.off()
saveRDS(res, file=paste0("bp", idx, ".rds"))

HPC-specific packages

There are also multiple packages available to make use of HPC resources from within R. That is to say, that R will submit a job or multiple jobs, and retrieve the result back to the session.

There are, for instance, the packages BatchJobs and batchtools. These make use of the networked file system to write each call and its arguments to a file, that is then retrieved by the job and executed. These packages are robust for small numbers of jobs, however, they will put a substantial strain on the file system for a high number of function calls.

Instead, we will introduce two packages that make only little use of the shared file system, slurmR and clustermq. (Note that I am the author of the latter, so there’s a bit of a conflict of interest here.)

There is also the Rmpi package, which provides an R cluster object for e.g. parLapply of multiple instances communicating over different nodes. There is an example on the Sulis docs, however, it requires additional setup in the job submission script.

slurmR

The slurmR package is a lightweight (dependency-free) R package that allows users to interact with the scheduler.

It can be used to create a cluster object on Slurm tasks analogous to parallel::makePSOCKcluster. We do not need a separate submission script, but should run it in an interactive job to not strain the login node. The command for this is slurmR::makeSlurmCluster(ntasks), which can then be used with the parLapply or parSapply functions:

# install.packages("slurmR") if you have not installed the package yet
library(slurmR)

opts_slurmR$set_preamble("module load rjags/4-10-R-4.1.2")

cl = makeSlurmCluster(5, account="su105")

system.time({ parSapply(cl, 1:10, function(i) Sys.sleep(1)) })

stopCluster(cl)

The sbatch call that is created via makeSlurmCluster can be customized by passing different parameters to the cluster creation function. In particular, we need to:

  • Include the right budgeting account

More details can be found in their Getting Started vignette.

clustermq

The clustermq package provides an interface to multiple HPC schedulers (including Slurm) via the ZeroMQ socket library, available as a module:

module load GCCcore/11.2.0 ZeroMQ/4.3.4
# install.packages("clustermq") if you have not installed the package yet
library(clustermq)

Q(function(i) Sys.sleep(1), i=1:10, n_jobs=5)

The package relies on a submission template, which by default submits Slurm jobs as a job array. As we need to supply an account name, and the policy on Sulis is to use tasks whenever possible, we should modify the submission template to:

  • Include the right budgeting account
  • Use srun with multiple tasks per index in the job array this is not actually working (yet)

To address both points, we can create a new file cmq_slurm.tmpl with the following contents:

#!/bin/sh
#SBATCH --account={{ account | su105 }}
#SBATCH --job-name={{ job_name }}
#SBATCH --ntasks={{ n_jobs }}
#SBATCH --mem-per-cpu={{ memory | 1024M }}
#SBATCH --cpus-per-task={{ cores | 1 }}

module load rjags/4-10-R-4.1.2 # this we will need for our example

CMQ_AUTH={{ auth }} srun R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

We can then use this newly created template by:

options(clustermq.host = "ib0", # faster network
        clustermq.template = "/path/to/cmq_slurm.tmpl")

We can either run these lines in an active R session, or add them to ~/.Rprofile to set automatically each time R is started.

This should make the above example run within tasks instead of an array job. More information on how to configure and use clustermq is available at the User Guide vignette.

Exercise

  • Run the above mcp example using one interactive “master” job (e.g. using the R integration with nvim) and
    • Run the workers with slurmR
    • Run the workers with clustermq
Solution slurmR

If not in an interactive job yet, start it using

srun --account su105 --ntasks=1 --cpus-per-task=1 --time 6:00:00 --pty $SHELL

In nvim create the file compute_breakpt_slurmR.r:

library(slurmR)

run_mcp = function(sim) {
    library(mcp)
    # one unbroken segment, 2 segments with one break point, 3 segments 2 bps
    mods = list(list(y ~ 1),
                list(y ~ 1, ~ 1),
                list(y ~ 1, ~ 1, ~ 1))

    # fit all three models, select the best using leave-one-out
    fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
    compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
    best = as.integer(sub("model", "", rownames(compare)))[1]
    fits[[best]]
}

dset = readRDS("sim_breakpt.rds")

cl = makeSlurmCluster(10, account="su105")
res = parLapply(cl, dset, run_mcp)
stopCluster(cl)

saveRDS(res, file="breakpt_slurmR.rds")
Solution clustermq

If not in an interactive job yet, start it using

srun --account su105 --ntasks=1 --cpus-per-task=1 --time 6:00:00 --pty $SHELL

In nvim create the file compute_breakpt_clustermq.r:

library(clustermq)

run_mcp = function(sim) {
    # one unbroken segment, 2 segments with one break point, 3 segments 2 bps
    mods = list(list(y ~ 1),
                list(y ~ 1, ~ 1),
                list(y ~ 1, ~ 1, ~ 1))

    # fit all three models, select the best using leave-one-out
    fits = lapply(mods, function(m) mcp::mcp(m, data=sim, par_x="x"))
    compare = as.data.frame(loo::loo_compare(lapply(fits, mcp::loo)))
    best = as.integer(sub("model", "", rownames(compare)))[1]
    fits[[best]]
}

dset = readRDS("sim_breakpt.rds")

res = Q(run_mcp, sim=dset, pkgs="mcp", n_jobs=10)

saveRDS(res, file="breakpt_clustermq.rds")