The R community has developed several packages to take advantage of parallelism.
Many of these packages are simply wrappers around one or multiple other parallelism packages forming a complex and sometimes confusing web of packages.
Package parallel
attempts to eliminate some of this by wrapping snow
and multicore
into a nice bundle.
Parallel computation is especially suited to “embarrassingly parallel” problems like large-scale simulations and by-group analyses.
Package Parallel
Parallel computation can be divided into explicit and implicit parallelism.
Multicore parallelism
The multicore
package, bundled within parallel
provides a way of running parallel computations in R
on machines with multiple cores or CPUs by making use of operating system facilities.
At present time, multocore
, as based on process splitting, seems to work only on Unix like computers.
Function mclapply
, works just like the regular lapply
function to iterate across the elements of a list, but iterations automatically run in parallel to speed up the computations.
Example: : 2-dimensional function
We want to calculate the 2-dimensional f()
function on a grid between [-10, 10]
consisting of 1,000
points where f()
is defined as:
1 2 3 4 |
f <- function(x) { r <- sqrt(x[1]^2 + x[2]^2) 10 * sin(r) / r } |
We simulate some test data:
1 2 3 |
x <- seq(-10, 10, length.out=1000) grid <- expand.grid(x=x, y=x) head(grid, n = 3) |
1 2 3 4 |
## x y ## 1 -10.00 -10 ## 2 -9.98 -10 ## 3 -9.96 -10 |
The sequential calculation simply calls function f()
for each row of the grid:
1 |
system.time({z <- apply(grid, 1, f)}) |
1 2 |
## user system elapsed ## 14.964 0.147 15.129 |
The calculation needs about 20
seconds. To assure the correctness of the calculation we plot the function:
1 2 3 |
par(mfrow = c(1,1)) dim(z) <- c(length(x), length(x)) persp(x, x, z, theta=30, phi=30, expand=0.5, col='white', border=NA, shade=0.3, box=FALSE) |
A strategy for a parallel computation of function f()
is to split the grid into subgrids and calculate the function for these subgrids in parallel.
Here two cores are given, therefore the grid is split into two parts along y = 0
:
1 |
grid_list <- split(grid, grid[,'y'] > 0) |
Now, apply()
from the sequential calculation is executed for each element of the list. A sequential execution is done using lapply()
:
1 |
system.time({z_list <- lapply(grid_list, apply, 1, f)}) |
1 2 |
## user system elapsed ## 11.43 0.07 11.51 |
The parallel execution is done using the parallel analogon, mclapply()
:
1 2 |
library(parallel) system.time({z_list <- mclapply(grid_list, apply, 1, f, mc.cores = 2L, mc.preschedule=TRUE)}) |
1 2 |
## user system elapsed ## 5.118 0.196 5.297 |
The parallel execution needs about half of the time. In background the mclapply()
has forked two child processes and each of the them has calculated the result for one list element. In principle, mclapply()
produces the following results:
1 2 3 4 5 6 7 8 9 10 11 12 |
par(mfrow=c(1,2)) # First half: x1 <- x[x > 0] z1 <- z_list[[1]] dim(z1) <- c(length(x), length(x1)) persp(x, x1, z1, theta=90, phi=30, expand=0.5, col='white',border=NA, shade=0.3, box=FALSE) # Second half: x2 <- x[x <= 0] z2 <- z_list[[2]] dim(z2) <- c(length(x), length(x2)) persp(x, x2, z2, theta=90, phi=30, expand=0.5, col='white', border=NA, shade=0.3, box=FALSE) |
For the final result the elements of the list must be put together:
1 |
z <- Reduce(c, z_list) |
Example: Cross Validation
We can define a simple cross validation function for lm
type objects as follows:
1 2 3 4 5 6 7 8 9 |
cross_val <- function(i, fm){ data <- fm$model formula <- formula(fm) response <- as.character(terms(fm)[[2]]) fm <- lm(formula, data = data[-i,]) newdata <- data[i,] predicted <- predict (fm, newdata) sqrt((predicted - data[i,response] )^2) } |
We create a 1,000
rows dataframe and a linear regression model applied to it.
1 2 3 4 |
n <-5*10^3 df <- data.frame(x = 1:n,y = 2+3*c(1:n)+rnorm(n, 0, 1500)) df$y[800] <- 2*10^4 plot(y~x , data = df, pch = 16, cex = .5) |
1 |
fm <- lm(y~x, data = df) |
The previously created cross_val()
function can be iterated trough lapply()
using a single core:
1 |
system.time({single <- lapply(1:n, cross_val, fm)})[3] |
1 2 |
## elapsed ## 30.79 |
1 2 3 |
op <- par(mfrow = c(1, 2)) plot(y~x , df, pch = 16, col = "red", cex = .6) plot(unlist(single), type = "s", ylab = "Cross Validation", col = "darkgreen") |
1 |
par(op) |
By using mclapply()
, parallelism is achieved straightforwardly without the need of setting an explicit cluster environment:
1 2 3 |
system.time({ quad <- mclapply(1:n, cross_val, fm, mc.cores = 4L, mc.preschedule = TRUE)})[3] |
1 2 |
## elapsed ## 10.95 |
Clearly, both methods lead to the same results:
1 |
identical(single, quad) |
1 |
## [1] TRUE |
Summary statistics revisited
We can revisit the previous example about a generic summary function:
1 2 3 4 5 6 7 |
my_summary <- function(x, flist){ f <- function(f,...)f(...) g <- function(x, flist){vapply(flist, f , x, FUN.VALUE = numeric(1))} df <- as.data.frame(lapply(x, g , flist)) row.names(df) <- names(flist) df } |
Suppose we have a dataframe of about 0.4 Gb
1 2 3 4 |
n <- 10^7 df <- data.frame(x1 = rnorm(n, 10, 1), x2 = rweibull(n, 1, 2), x3 = rpois(n, 100), x4 = rnorm(n, 10, 1), x5 = rweibull(n, 1, 2), x6 = rpois(n, 100)) print(object.size(df), units = "Gb") |
1 |
## 0.4 Gb |
We may use my_summary()
1 2 3 4 |
system.time({ my_summary(df, flist = list(mean = mean, stdev = sd, range = function(x,...){sd(x,...)/mean(x,...)}) )}) |
1 2 |
## user system elapsed ## 1.788 0.066 1.858 |
or write a parallel version of this function with minor modifications to teh original my_summary()
:
1 2 3 4 5 6 7 |
my_mc_summary <- function(x, flist, mc.cores = 1L){ f <- function(f,...)f(...) g <- function(x, flist){vapply(flist, f , x, FUN.VALUE = numeric(1))} df <- as.data.frame(mclapply(x, g , flist, mc.cores = mc.cores)) row.names(df) <- names(flist) df } |
and use it as
1 2 3 4 5 |
system.time({ my_mc_summary(df, flist = list(mean = mean, stdev = sd, range = function(x,...){sd(x,...)/mean(x,...)}), mc.cores = 2L )}) |
1 2 |
## user system elapsed ## 0.555 0.082 0.684 |
The reduction in computing time is appreciabl eat very little coding effort.
Finally, my_mc_summary(..., mc.cores = 1L)
will run the mc
function on a single core as my_summary()
.
Prescheduling
argument mc.preschedule()
of mclapply()
controls how data are allocated to processes and is set to TRUE
by default.
If mc.preschedule
is TRUE
, then the data is divided into n
sections a priori and passed to mc.cores
processes.
If mc.preschedule
is FALSE
, then a job is constructed for each data value sequentially, up to mc.cores
at a time.
mc.preschedule
set to TRUE
is better for short computations or large number of values in X
while mc.preschedule
set to FALSE
is better for jobs that have high variance of completion time and not too many values of X
.
A convolution is good example where mc.preschedule
surely needs to be set at TRUE
. We need to pass X = n = 10^4
data to functional mclapply()
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# mc.preschedule = TRUE lambda <- 1000 n <- 10^4 f <- function(i, np, lambda, meanlog, sdlog){ sum(rlnorm(np[i], meanlog, sdlog)) } # mc.preschedule = FALSE system.time({ np <- rpois(n, lambda) out <- mclapply(X = 1:n, FUN = f, np = np, meanlog = 9, sdlog = 2, mc.preschedule = FALSE) }) |
1 2 |
## user system elapsed ## 19.02 207.41 107.58 |
Explicit parallelism has the programmer responsible for dividing the problem to be solved into independent chunks, to be run in parallel, and also responsible for aggregating the results from each chunk.
Cluster parallelism
While mclaplly()
works on the basis of process forking abstactiong the user for the need of managing the underneath parallel backend, R
offers other functionality for creating a set of copies of R running in parallel and communicating over sockets.
This method require the user to set up the cluster prior using it but, on the other hand, allows parallel computation to be extended over several machines.
We can asily define nc
“workers” on a single computer by calling makeCluster(nc)
:
1 2 3 4 |
library(parallel) nc <- detectCores() cluster <- makeCluster(nc) cbind(clusterCall(cluster, function() {Sys.info()["nodename"]})) |
1 2 3 4 5 |
## [,1] ## [1,] "mutolo" ## [2,] "mutolo" ## [3,] "mutolo" ## [4,] "mutolo" |
1 |
stopCluster(cluster) |
clusterCall()
calls a function on each node, whereas stopCluster(cluster)
closes the cluster parallel environment previously created.
Notice that actually, when a parallel computation environment is created with makeCluster()
, a “master” process with a number of workers or slaves processes are run. The role of master process is to manage worker processes and join the results, while slave processes actually perform the computations.
Then, after makeCluster(nc)
call in previous script, nc + 1
R processes shall run: a master process, and nc
workers processes.
1 2 3 |
fx <- function(x){x+1} cluster <- makeCluster(2) clusterCall(cluster, fx, x = 5) |
1 2 3 4 5 |
## [[1]] ## [1] 6 ## ## [[2]] ## [1] 6 |
1 |
stopCluster(cluster) |
Variables defined at master level are not directly available to all slaves. Therefore, if this example works
1 2 3 |
fxx <- function(x){x + xx} xx <- 3 fxx(1) |
1 |
## [1] 4 |
the following will fail
1 2 |
cluster <- makeCluster(2) clusterCall(cluster, fxx, x = 2) |
1 |
## Error: 2 nodes produced errors; first error: object 'xx' not found |
1 |
stopCluster(cluster) |
Whenever required we’ll have to export master variables to all slaves
1 2 3 4 |
xx <- 1 cluster = makeCluster(2) clusterExport(cluster, "xx") clusterCall(cluster, fxx, x = 2) |
1 2 3 4 5 |
## [[1]] ## [1] 3 ## ## [[2]] ## [1] 3 |
1 |
stopCluster(cluster) |
Similarly, we have to attach or required libraries at slave level. The function clusterEvalQ()
, as it evaluates an expression at each cluster node, is the ideal candidate to achieve this task:
1 2 |
cluster <- makeCluster(2) clusterEvalQ(cluster, library(MASS)) |
1 2 3 4 5 6 7 |
## [[1]] ## [1] "MASS" "methods" "stats" "graphics" "grDevices" "utils" ## [7] "datasets" "base" ## ## [[2]] ## [1] "MASS" "methods" "stats" "graphics" "grDevices" "utils" ## [7] "datasets" "base" |
1 |
stopCluster(cluster) |
When calling makeCluster()
, if no type argument is supplied, it defaults to type = "PSOCK"
that calls makePSOCKcluster
.
makePSOCKcluster
is very similar to makeSOCKcluster
in package snow
. It runs Rscript
on the specified host(s) to set up a worker process which listens on a socket for expressions to evaluate, and returns the results.
A simple convolution based on lapply()
represents a good example for cluster parallelism. We can test the the standalone convolution by:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
lambda <- 1000 n <- 10^5 f <- function(i, np, lambda, meanlog, sdlog){ sum(rlnorm(np[i], meanlog, sdlog)) } system.time({ np <- rpois(n, lambda) out <- sapply(X = 1:n, FUN = f, np = np, meanlog = 9, sdlog = 2) cat ("95th quantile = " , quantile(out , .95), "\n") }) |
1 |
## 95th quantile = 81480802 |
1 2 |
## user system elapsed ## 13.310 0.091 13.414 |
We just need to replace the sapply
function with the corresponding parSapply
to make use of R
parallel capabilities.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
nc <- detectCores() cluster <- makeCluster(nc) f <- function(i, np, lambda, meanlog, sdlog){ sum(rlnorm(np[i], meanlog, sdlog)) } system.time({ np <- rpois(n, lambda) out <- parSapply(cl = cluster , X = 1:n, FUN = f, np = np, meanlog = 9, sdlog = 2) cat ("95th quantile = " , quantile(out , .95), "\n") }) |
1 |
## 95th quantile = 81401522 |
1 2 |
## user system elapsed ## 0.109 0.005 4.462 |
1 |
stopCluster(cluster) |
Setting up a cluster
To run the same procedure in parallel on several machines, a proper network environment must be set up with public keys and hosts files.
The following procedure has to be executed only once in order to configure the network environment and set up all necessary permissions.
- All computers require
ssh server
andssh-client
installed - Master being able of comunicatng with slaves via
ssh keys
- Master being able of comunicatng with itself via
ssh keys
- Master being able to comunicate to slaves over a given port in the range 11000:11999
- Hosts names are properly setup in
/etc/hosts
Note that we can generate a ssh key
with ssh-keygen -t rsa
and copy the key
to the remote machine by ssh-copy-id user@remote
Unfortunately, when creating a snow (or parallel) cluster object many things can go wrong, and the most common failure mode is to hang indefinitely. In addition to ssh
issues, the problem could be:
R
not installed on a worker machinesnow
not installed on a the worker machineR
orsnow
not installed in the same location as the local machine- current user doesn’t exist on a worker machine
- networking problem
- firewall problem
and there are no doubt more possibilities.
In our experience, the single most useful troubleshooting technique is manual mode. Just set “manual” to TRUE when creating the cluster object. It’s also a good idea to set “outfile” to the empty string so that you’re more likely to see useful error messages:
1 |
cl <- makeSOCKcluster(ip_slave, manual=TRUE, outfile="") |
makeSOCKcluster()
will display an Rscript command to execute in a terminal on the specified machine. Obviously, this bypasses any ssh issues, and you will quickly learn if R
or snow
is not installed in the expected location. If we are lucky, we will get an error message and that will lead us to the solution
Example: Cluster convolution
1 2 3 4 5 |
ip_master <- "localhost" ip_slave <- "localhost" biCluster <- makeCluster(spec = c(rep(ip_master, 2) , rep(ip_slave, 2)), port = 11001) cbind(clusterCall(biCluster, function() {Sys.info()["nodename"]})) |
1 2 3 4 5 |
## [,1] ## [1,] "mutolo" ## [2,] "mutolo" ## [3,] "mutolo" ## [4,] "mutolo" |
1 |
stopCluster(biCluster) |
As the results show, the above instructions created a parallel computational environment with four slave processes at master
and four slave processes slave
Now the convolution exercise can be easily divided among eight cores on two networked machines.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
biCluster <- makeCluster(spec = c(rep(ip_master, 2), rep(ip_slave, 2))) lambda <- 1000 n <- 10^6 f <- function(i, np, lambda, meanlog, sdlog) { sum(rlnorm(np[i], meanlog, sdlog)) } system.time({ np <- rpois(n, lambda) out <- parSapply(cl = biCluster, X = 1:n, FUN = f, np = np, meanlog = 9, sdlog = 2) cat("95th quantile = ", quantile(out, 0.95), "\n") }) |
1 |
## 95th quantile = 81504374 |
1 2 |
## user system elapsed ## 1.145 0.036 38.850 |
1 |
stopCluster(biCluster) |
Finally, working across public networks, especially with little bandwidth, may easily kill the extra benefits of having multiple cores available
Package Foreach
When loading the foreach
packages, R displays:
1 2 3 4 |
library(foreach) foreach: simple, scalable parallel programming from Revolution Analytics Use Revolution R for scalability, fault tolerance and more. http://www.revolutionanalytics.com |
And in fact foreach
is a R
library developed at Revolution Analytics: the major commercial supplier of R based solutions.
Package foreach
provides a new looping construct for executing R code repeatedly.
1 |
library(foreach) |
1 |
## Error: there is no package called 'foreach' |
1 |
foreach ( i=1:3) %do% log(i) |
1 |
## Error: could not find function "%do%" |
Function foreach()
is similar to the standard function lapply
, but doesn’t require the evaluation of a function. Moreover, foreach()
has some interesting functionality such as the combine argument that makes foreach
a very versatile iterator.
Argument .combine
, combines results after the execution of the loop:
1 |
foreach(i = 1:3, .combine = cbind) %do% sample(1:5, 5) |
1 |
## Error: could not find function "%do%" |
In the case, foreach
outputs a matrix as function cbind returns.
The main reason for using package foreach
is that it supports parallel execution, that is, it can execute those repeated operations on multiple cores, or on multiple nodes of a cluster.
Changing from the single core to the multi core version of a foreach loop results in a quite a simple task:
1 |
library(doMC) |
1 |
## Error: there is no package called 'doMC' |
1 |
registerDoMC(cores = 2) |
1 |
## Error: could not find function "registerDoMC" |
1 |
result <- foreach ( i=1:100) %dopar% log(i) |
1 |
## Error: could not find function "%dopar%" |
Clearly, with this example, no advantage exists when using the parallel version of the foreach iterator. This is is only for teaching purposes.
Note that foreach needs a parallel backend to be able to run in %dopar%
mode. The parallel backend, in this case, is provided by function registerDoMC()
. This function is used to register the multicore parallel backend with the foreach package.