The R community has developed several packages to take advantage of parallelism.

Many of these packages are simply wrappers around one or multiple other parallelism packages forming a complex and sometimes confusing web of packages.

Package parallel attempts to eliminate some of this by wrapping snow and multicore into a nice bundle.

Parallel computation is especially suited to “embarrassingly parallel” problems like large-scale simulations and by-group analyses.

Package Parallel

Parallel computation can be divided into explicit and implicit parallelism.

Multicore parallelism

The multicore package, bundled within parallel provides a way of running parallel computations in R on machines with multiple cores or CPUs by making use of operating system facilities.

At present time, multocore, as based on process splitting, seems to work only on Unix like computers.

Function mclapply, works just like the regular lapply function to iterate across the elements of a list, but iterations automatically run in parallel to speed up the computations.

Example: : 2-dimensional function

We want to calculate the 2-dimensional f() function on a grid between [-10, 10] consisting of 1,000 points where f() is defined as:

We simulate some test data:

The sequential calculation simply calls function f() for each row of the grid:

The calculation needs about 20 seconds. To assure the correctness of the calculation we plot the function:

plot of chunk parallel-004

A strategy for a parallel computation of function f() is to split the grid into subgrids and calculate the function for these subgrids in parallel.

Here two cores are given, therefore the grid is split into two parts along y = 0:

Now, apply() from the sequential calculation is executed for each element of the list. A sequential execution is done using lapply():

The parallel execution is done using the parallel analogon, mclapply():

The parallel execution needs about half of the time. In background the mclapply() has forked two child processes and each of the them has calculated the result for one list element. In principle, mclapply() produces the following results:

plot of chunk parallel-008

For the final result the elements of the list must be put together:

Example: Cross Validation

We can define a simple cross validation function for lm type objects as follows:

We create a 1,000 rows dataframe and a linear regression model applied to it.

plot of chunk dataframeDf

The previously created cross_val() function can be iterated trough lapply() using a single core:

plot of chunk crossValsingleCore

By using mclapply(), parallelism is achieved straightforwardly without the need of setting an explicit cluster environment:

Clearly, both methods lead to the same results:

Summary statistics revisited

We can revisit the previous example about a generic summary function:

Suppose we have a dataframe of about 0.4 Gb

We may use my_summary()

or write a parallel version of this function with minor modifications to teh original my_summary():

and use it as

The reduction in computing time is appreciabl eat very little coding effort.

Finally, my_mc_summary(..., mc.cores = 1L) will run the mc function on a single core as my_summary().

Prescheduling

argument mc.preschedule() of mclapply() controls how data are allocated to processes and is set to TRUE by default.

If mc.preschedule is TRUE, then the data is divided into n sections a priori and passed to mc.cores processes.

If mc.preschedule is FALSE, then a job is constructed for each data value sequentially, up to mc.cores at a time.

mc.preschedule set to TRUE is better for short computations or large number of values in X while mc.preschedule set to FALSE is better for jobs that have high variance of completion time and not too many values of X.

A convolution is good example where mc.preschedule surely needs to be set at TRUE. We need to pass X = n = 10^4 data to functional mclapply().

Explicit parallelism has the programmer responsible for dividing the problem to be solved into independent chunks, to be run in parallel, and also responsible for aggregating the results from each chunk.

Cluster parallelism

While mclaplly() works on the basis of process forking abstactiong the user for the need of managing the underneath parallel backend, R offers other functionality for creating a set of copies of R running in parallel and communicating over sockets.

This method require the user to set up the cluster prior using it but, on the other hand, allows parallel computation to be extended over several machines.

We can asily define nc “workers” on a single computer by calling makeCluster(nc):

clusterCall() calls a function on each node, whereas stopCluster(cluster) closes the cluster parallel environment previously created.

Notice that actually, when a parallel computation environment is created with makeCluster(), a “master” process with a number of workers or slaves processes are run. The role of master process is to manage worker processes and join the results, while slave processes actually perform the computations.

Then, after makeCluster(nc) call in previous script, nc + 1 R processes shall run: a master process, and nc workers processes.

Variables defined at master level are not directly available to all slaves. Therefore, if this example works

the following will fail

Whenever required we’ll have to export master variables to all slaves

Similarly, we have to attach or required libraries at slave level. The function clusterEvalQ(), as it evaluates an expression at each cluster node, is the ideal candidate to achieve this task:

When calling makeCluster(), if no type argument is supplied, it defaults to type = "PSOCK" that calls makePSOCKcluster.

makePSOCKcluster is very similar to makeSOCKcluster in package snow. It runs Rscript on the specified host(s) to set up a worker process which listens on a socket for expressions to evaluate, and returns the results.

A simple convolution based on lapply() represents a good example for cluster parallelism. We can test the the standalone convolution by:

We just need to replace the sapply function with the corresponding parSapply to make use of R parallel capabilities.

Setting up a cluster

To run the same procedure in parallel on several machines, a proper network environment must be set up with public keys and hosts files.

The following procedure has to be executed only once in order to configure the network environment and set up all necessary permissions.

  1. All computers require ssh server and ssh-client installed
  2. Master being able of comunicatng with slaves via ssh keys
  3. Master being able of comunicatng with itself via ssh keys
  4. Master being able to comunicate to slaves over a given port in the range 11000:11999
  5. Hosts names are properly setup in /etc/hosts

Note that we can generate a ssh key with ssh-keygen -t rsa and copy the key to the remote machine by ssh-copy-id user@remote

Unfortunately, when creating a snow (or parallel) cluster object many things can go wrong, and the most common failure mode is to hang indefinitely. In addition to ssh issues, the problem could be:

  • R not installed on a worker machine
  • snow not installed on a the worker machine
  • R or snow not installed in the same location as the local machine
  • current user doesn’t exist on a worker machine
  • networking problem
  • firewall problem

and there are no doubt more possibilities.

In our experience, the single most useful troubleshooting technique is manual mode. Just set “manual” to TRUE when creating the cluster object. It’s also a good idea to set “outfile” to the empty string so that you’re more likely to see useful error messages:

makeSOCKcluster() will display an Rscript command to execute in a terminal on the specified machine. Obviously, this bypasses any ssh issues, and you will quickly learn if R or snow is not installed in the expected location. If we are lucky, we will get an error message and that will lead us to the solution

Example: Cluster convolution

As the results show, the above instructions created a parallel computational environment with four slave processes at master and four slave processes slave

Now the convolution exercise can be easily divided among eight cores on two networked machines.

Finally, working across public networks, especially with little bandwidth, may easily kill the extra benefits of having multiple cores available

Package Foreach

When loading the foreach packages, R displays:

And in fact foreach is a R library developed at Revolution Analytics: the major commercial supplier of R based solutions.

Package foreach provides a new looping construct for executing R code repeatedly.

Function foreach() is similar to the standard function lapply, but doesn’t require the evaluation of a function. Moreover, foreach() has some interesting functionality such as the combine argument that makes foreach a very versatile iterator.

Argument .combine , combines results after the execution of the loop:

In the case, foreach outputs a matrix as function cbind returns.

The main reason for using package foreach is that it supports parallel execution, that is, it can execute those repeated operations on multiple cores, or on multiple nodes of a cluster.

Changing from the single core to the multi core version of a foreach loop results in a quite a simple task:

Clearly, with this example, no advantage exists when using the parallel version of the foreach iterator. This is is only for teaching purposes.

Note that foreach needs a parallel backend to be able to run in %dopar% mode. The parallel backend, in this case, is provided by function registerDoMC(). This function is used to register the multicore parallel backend with the foreach package.