Tài liệu Parallel R

pdf 122 trang huongle 3400
Bạn đang xem 20 trang mẫu của tài liệu "Tài liệu Parallel R", để tải tài liệu gốc về máy bạn click vào nút DOWNLOAD ở trên

Tài liệu đính kèm:

  • pdftai_lieu_parallel_r.pdf

Nội dung text: Tài liệu Parallel R

  1. www.it-ebooks.info
  2. www.it-ebooks.info
  3. Parallel R Q. Ethan McCallum and Stephen Weston Beijing • Cambridge • Farnham • Kửln • Sebastopol • Tokyo www.it-ebooks.info
  4. Parallel R by Q. Ethan McCallum and Stephen Weston Copyright â 2012 Q. Ethan McCallum and Stephen Weston. All rights reserved. Printed in the United States of America. Published by O’Reilly Media, Inc., 1005 Gravenstein Highway North, Sebastopol, CA 95472. O’Reilly books may be purchased for educational, business, or sales promotional use. Online editions are also available for most titles ( For more information, contact our corporate/institutional sales department: (800) 998-9938 or corporate@oreilly.com. Editors: Mike Loukides and Meghan Blanchette Cover Designer: Karen Montgomery Production Editor: Kristen Borg Interior Designer: David Futato Proofreader: O’Reilly Production Services Illustrator: Robert Romano Revision History for the First Edition: 2011-10-21 First release See for release details. Nutshell Handbook, the Nutshell Handbook logo, and the O’Reilly logo are registered trademarks of O’Reilly Media, Inc. Parallel R, the image of a rabbit, and related trade dress are trademarks of O’Reilly Media, Inc. Many of the designations used by manufacturers and sellers to distinguish their products are claimed as trademarks. Where those designations appear in this book, and O’Reilly Media, Inc., was aware of a trademark claim, the designations have been printed in caps or initial caps. While every precaution has been taken in the preparation of this book, the publisher and authors assume no responsibility for errors or omissions, or for damages resulting from the use of the information con- tained herein. ISBN: 978-1-449-30992-3 [LSI] 1319202138 www.it-ebooks.info
  5. Table of Contents Preface vii 1. Getting Started 1 Why R? 1 Why Not R? 1 The Solution: Parallel Execution 2 A Road Map for This Book 2 What We’ll Cover 3 Looking Forward 3 What We’ll Assume You Already Know 3 In a Hurry? 4 snow 4 multicore 4 parallel 4 R+Hadoop 4 RHIPE 5 Segue 5 Summary 5 2. snow 7 Quick Look 7 How It Works 7 Setting Up 8 Working with It 9 Creating Clusters with makeCluster 9 Parallel K-Means 10 Initializing Workers 12 Load Balancing with clusterApplyLB 13 Task Chunking with parLapply 15 Vectorizing with clusterSplit 18 Load Balancing Redux 20 iii www.it-ebooks.info
  6. Functions and Environments 23 Random Number Generation 25 snow Configuration 26 Installing Rmpi 29 Executing snow Programs on a Cluster with Rmpi 30 Executing snow Programs with a Batch Queueing System 32 Troubleshooting snow Programs 33 When It Works 35 And When It Doesn’t 36 The Wrap-up 36 3. multicore . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 37 Quick Look 37 How It Works 38 Setting Up 38 Working with It 39 The mclapply Function 39 The mc.cores Option 39 The mc.set.seed Option 40 Load Balancing with mclapply 42 The pvec Function 42 The parallel and collect Functions 43 Using collect Options 44 Parallel Random Number Generation 46 The Low-Level API 47 When It Works 49 And When It Doesn’t 49 The Wrap-up 49 4. parallel . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51 Quick Look 52 How It Works 52 Setting Up 52 Working with It 53 Getting Started 53 Creating Clusters with makeCluster 54 Parallel Random Number Generation 55 Summary of Differences 57 When It Works 58 And When It Doesn’t 58 The Wrap-up 58 iv | Table of Contents www.it-ebooks.info
  7. 5. A Primer on MapReduce and Hadoop 59 Hadoop at Cruising Altitude 59 A MapReduce Primer 60 Thinking in MapReduce: Some Pseudocode Examples 61 Calculate Average Call Length for Each Date 62 Number of Calls by Each User, on Each Date 62 Run a Special Algorithm on Each Record 63 Binary and Whole-File Data: SequenceFiles 63 No Cluster? No Problem! Look to the Clouds 64 The Wrap-up 66 6. R+Hadoop 67 Quick Look 67 How It Works 67 Setting Up 68 Working with It 68 Simple Hadoop Streaming (All Text) 69 Streaming, Redux: Indirectly Working with Binary Data 72 The Java API: Binary Input and Output 74 Processing Related Groups (the Full Map and Reduce Phases) 79 When It Works 83 And When It Doesn’t 83 The Wrap-up 84 7. RHIPE 85 Quick Look 85 How It Works 85 Setting Up 86 Working with It 87 Phone Call Records, Redux 87 Tweet Brevity 91 More Complex Tweet Analysis 96 When It Works 98 And When It Doesn’t 99 The Wrap-up 100 8. Segue 101 Quick Look 101 How It Works 102 Setting Up 102 Working with It 102 Model Testing: Parameter Sweep 102 When It Works 105 Table of Contents | v www.it-ebooks.info
  8. And When It Doesn’t 105 The Wrap-up 106 9. New and Upcoming . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 107 doRedis 107 RevoScale R and RevoConnectR (RHadoop) 108 cloudNumbers.com 108 vi | Table of Contents www.it-ebooks.info
  9. Preface Conventions Used in This Book The following typographical conventions are used in this book: Italic Indicates new terms, URLs, email addresses, filenames, and file extensions. Constant width Used for program listings, as well as within paragraphs to refer to program elements such as variable or function names, databases, data types, environment variables, statements, and keywords. Constant width bold Shows commands or other text that should be typed literally by the user. Constant width italic Shows text that should be replaced with user-supplied values or by values deter- mined by context. This icon signifies a tip, suggestion, or general note. This icon indicates a warning or caution. Using Code Examples This book is here to help you get your job done. In general, you may use the code in this book in your programs and documentation. You do not need to contact us for permission unless you’re reproducing a significant portion of the code. For example, writing a program that uses several chunks of code from this book does not require permission. Selling or distributing a CD-ROM of examples from O’Reilly books does vii www.it-ebooks.info
  10. require permission. Answering a question by citing this book and quoting example code does not require permission. Incorporating a significant amount of example code from this book into your product’s documentation does require permission. We appreciate, but do not require, attribution. An attribution usually includes the title, author, publisher, and ISBN. For example: “Parallel R by Q. Ethan McCallum and Stephen Weston (O'Reilly). Copyright 2012 Q. Ethan McCallum and Stephen Weston, 978-1-449-30992-3.” If you feel your use of code examples falls outside fair use or the permission given above, feel free to contact us at permissions@oreilly.com. Safariđ Books Online Safari Books Online is an on-demand digital library that lets you easily search over 7,500 technology and creative reference books and videos to find the answers you need quickly. With a subscription, you can read any page and watch any video from our library online. Read books on your cell phone and mobile devices. Access new titles before they are available for print, and get exclusive access to manuscripts in development and post feedback for the authors. Copy and paste code samples, organize your favorites, down- load chapters, bookmark key sections, create notes, print out pages, and benefit from tons of other time-saving features. O’Reilly Media has uploaded this book to the Safari Books Online service. To have full digital access to this book and others on similar topics from O’Reilly and other pub- lishers, sign up for free at How to Contact Us Please address comments and questions concerning this book to the publisher: O’Reilly Media, Inc. 1005 Gravenstein Highway North Sebastopol, CA 95472 800-998-9938 (in the United States or Canada) 707-829-0515 (international or local) 707-829-0104 (fax) We have a web page for this book, where we list errata, examples, and any additional information. You can access this page at: To comment or ask technical questions about this book, send email to: bookquestions@oreilly.com viii | Preface www.it-ebooks.info
  11. For more information about our books, courses, conferences, and news, see our website at Find us on Facebook: Follow us on Twitter: Watch us on YouTube: Acknowledgments There are only two names on the cover, but a host of people made this book possible. We would like to thank the entire O’Reilly team for their efforts. They provided such a smooth process that we were able to focus on just the writing. A special thanks goes to our editors, Mike Loukides and Meghan Blanchette, for their guidance and support. We would also like to thank our review team. The following people generously dedi- cated their time and energy to read this book in its early state, and their feedback helped shape the text into the finished product you’re reading now: Robert Bjornson Nicholas Carriero Jonathan Seidman Paul Teetor Ramesh Venkataramaiah Jed Wing Any errors you find in this book belong to us, the authors. Most of all we thank you, the reader, for your interest in this book. We set out to create the guidebook we wish we’d had when we first tried to give R that parallel, distributed boost. R work is research work, best done with minimal distractions. We hope these chapters help you get up to speed quickly, so you can get R to do what you need with minimal detour from the task at hand. Q. Ethan McCallum “You like math? Oh, you need to talk to Mike. Let me introduce you.” I didn’t realize it at the time, but those words were the start of this project. Really. A chance encounter with Mike Loukides led to emails and phone calls and, before I knew it, we’d laid the groundwork for a new book. So first and foremost, a hearty thanks to Betsy and Laurel, who made my connection to Mike. Conversations with Mike led me to my co-author, Steve Weston. I’m pleased and flat- tered that he agreed to join me on this adventure. Thanks as well to the gang at Cafe les Deux Chats, for providing a quiet place to work. Preface | ix www.it-ebooks.info
  12. Stephen Weston This was my first book project, so I’d like to thank my co-author and editors for putting up with my freshman confusion and mistakes. They were very gracious throughout the project. I’m very grateful to Nick, Rob, and Jed for taking the time to read my chapters and help me not to make a fool of myself. I also want to thank my wife Diana and daughter Erica for proofreading material that wasn’t on their preferred reading lists. Finally, I’d like to thank all the authors of the packages that we discuss in this book. I had a lot of fun reading the source for all three of the packages that I wrote about. In particular, I’ve always loved the snow source code, which I studied when first learning to program in R. x | Preface www.it-ebooks.info
  13. CHAPTER 1 Getting Started This chapter sets the pace for the rest of the book. If you’re in a hurry, feel free to skip to the chapter you need. (The section “In a Hurry?” on page 4 has a quick-ref look at the various strategies and where they fit. That should help you pick a starting point.) Just make sure you come back here to understand our choice of vocabulary, how we chose what to cover, and so on. Why R? It’s tough to argue with R. Who could dislike a high-quality, cross-platform, open- source statistical software product? It has an interactive console for exploratory work. It can run as a scripting language to repeat a process you’ve captured. It has a lot of statistical calculations built-in so you don’t have to reinvent the wheel. Did we mention that R is free? When the base toolset isn’t enough, R users have access to a rich ecosystem of add-on packages and a gaggle of GUIs to make their lives even easier. No wonder R has become a favorite in the age of Big Data. Since R is perfect, then, we can end this book. Right? Not quite. It’s precisely the Big Data age that has exposed R’s blemishes. Why Not R? These imperfections stem not from defects in the software itself, but from the passage of time: quite simply, R was not built in anticipation of the Big Data revolution. R was born in 1995. Disk space was expensive, RAM even more so, and this thing called The Internet was just getting its legs. Notions of “large-scale data analysis” and “high- performance computing” were reasonably rare. Outside of Wall Street firms and uni- versity research labs, there just wasn’t that much data to crunch. 1 www.it-ebooks.info
  14. Fast-forward to the present day and hardware costs just a fraction of what it used to. Computing power is available online for pennies. Everyone is suddenly interested in collecting and analyzing data, and the necessary resources are well within reach. This surge in data analysis has brought two of R’s limitations to the forefront: it’s single- threaded and memory-bound. Allow us to explain: It’s single-threaded The R language has no explicit constructs for parallelism, such as threads or mu- texes. An out-of-the-box R install cannot take advantage of multiple CPUs. It’s memory-bound R requires that your entire dataset* fit in memory (RAM).† Four gigabytes of RAM will not hold eight gigabytes of data, no matter how much you smile when you ask. While these are certainly inconvenient, they’re hardly insurmountable. The Solution: Parallel Execution People have created a series of workarounds over the years. Doing a lot of matrix math? You can build R against a multithreaded basic linear algebra subprogram (BLAS). Churning through large datasets? Use a relational database or another manual method to retrieve your data in smaller, more manageable pieces. And so on, and so forth. Some big winners involve parallelism. Spreading work across multiple CPUs overcomes R’s single-threaded nature. Offloading work to multiple machines reaps the multi- process benefit and also addresses R’s memory barrier. In this book we’ll cover a few strategies to give R that parallel boost, specifically those which take advantage of mod- ern multicore hardware and cheap distributed computing. A Road Map for This Book Now that we’ve set the tone for why we’re here, let’s take a look at what we plan to accomplish in the coming pages (or screens if you’re reading this electronically). *We emphasize “dataset” here, not necessarily “algorithms.” †It’s a big problem. Because R will often make multiple copies of the same data structure for no apparent reason, you often need three times as much memory as the size of your dataset. And if you don’t have enough memory, you die a slow death as your poor machine swaps and thrashes. Some people turn off virtual memory with the swapoff command so they can die quickly. 2 | Chapter 1: Getting Started www.it-ebooks.info
  15. What We’ll Cover Each chapter is a look into one strategy for R parallelism, including: • What it is • Where to find it • How to use it • Where it works well, and where it doesn’t First up is the snow package, followed by a tour of the multicore package. We then provide a look at the new parallel package that’s due to arrive in R 2.14. After that, we’ll take a brief side-tour to explain MapReduce and Hadoop. That will serve as a foundation for the remaining chapters: R+Hadoop (Hadoop streaming and the Java API), RHIPE, and segue. Looking Forward In Chapter 9, we will briefly mention some tools that were too new for us to cover in- depth. There will likely be other tools we hadn’t heard about (or that didn’t exist) at the time of writing.‡ Please let us know about them! You can reach us through this book’s web- site at What We’ll Assume You Already Know This is a book about R, yes, but we’ll expect you know the basics of how to get around. If you’re new to R or need a refresher course, please flip through Paul Teetor’s R Cook- book (O’Reilly), Robert Kabacoff’s R In Action (Manning), or another introductory title. You should take particular note of the lapply() function, which plays an important role in this book. Some of the topics require several machines’ worth of infrastructure, in which case you’ll need access to a talented sysadmin. You’ll also need hardware, which you can buy and maintain yourself, or rent from a hosting provider. Cloud services, notably Amazon Web Services (AWS), Đ have become a popular choice in this arena. AWS has plenty of documentation, and you can also read Programming Amazon EC2, by Jurg van Vliet and Flavia Paganelli (O’Reilly) as a supplement. (Please note that using a provider still requires a degree of sysadmin knowledge. If you’re not up to the task, you’ll want to find and bribe your skilled sysadmin friends.) ‡ Try as we might, our massive Monte Carlo simulations have brought us no closer to predicting the next R parallelism strategy. Nor any winning lottery numbers, for that matter. Đ A Road Map for This Book | 3 www.it-ebooks.info
  16. In a Hurry? If you’re in a hurry, you can skip straight to the chapter you need. The list below is a quick look at the various strategies. snow Overview: Good for use on traditional clusters, especially if MPI is available. It sup- ports MPI, PVM, nws, and sockets for communication, and is quite portable, running on Linux, Mac OS X, and Windows. Solves: Single-threaded, memory-bound. Pros: Mature, popular package; leverages MPI’s speed without its complexity. Cons: Can be difficult to configure. multicore Overview: Good for big-CPU problems when setting up a Hadoop cluster is too much of a hassle. Lets you parallelize your R code without ever leaving the R interpreter. Solves: Single-threaded. Pros: Simple and efficient; easy to install; no configuration needed. Cons: Can only use one machine; doesn’t support Windows; no built-in support for parallel random number generation (RNG). parallel Overview: A merger of snow and multicore that comes built into R as of R 2.14.0. Solves: Single-threaded, memory-bound. Pros: No installation necessary; has great support for parallel random number generation. Cons: Can only use one machine on Windows; can be difficult to configure on multiple Linux machines. R+Hadoop Overview: Run your R code on a Hadoop cluster. Solves: Single-threaded, memory-bound. Pros: You get Hadoop’s scalability. Cons: Requires a Hadoop cluster (internal or cloud-based); breaks up a single logical process into multiple scripts and steps (can be a hassle for exploratory work). 4 | Chapter 1: Getting Started www.it-ebooks.info
  17. RHIPE Overview: Talk Hadoop without ever leaving the R interpreter. Solves: Single-threaded, memory-bound. Pros: Closer to a native R experience than R+Hadoop; use pure R code for your Map- Reduce operations. Cons: Requires a Hadoop cluster; requires extra setup on the cluster; cannot process standard SequenceFiles (for binary data). Segue Overview: Seamlessly send R apply-like calculations to a remote Hadoop cluster. Solves: Single-threaded, memory-bound. Pros: Abstracts you from Elastic MapReduce management. Cons: Cannot use with an internal Hadoop cluster (you’re tied to Amazon’s Elastic MapReduce). Summary Welcome to the beginning of your journey into parallel R. Our first stop is a look at the popular snow package. Summary | 5 www.it-ebooks.info
  18. www.it-ebooks.info
  19. CHAPTER 2 snow snow (“Simple Network of Workstations”) is probably the most popular parallel pro- gramming package available for R. It was written by Luke Tierney, A. J. Rossini, Na Li, and H. Sevcikova, and is actively maintained by Luke Tierney. It is a mature package, first released on the “Comprehensive R Archive Network” (CRAN) in 2003. Quick Look Motivation: You want to use a Linux cluster to run an R script faster. For example, you’re running a Monte Carlo simulation on your laptop, but you’re sick of waiting many hours or days for it to finish. Solution: Use snow to run your R code on your company or university’s Linux cluster. Good because: snow fits well into a traditional cluster environment, and is able to take advantage of high-speed communication networks, such as InfiniBand, using MPI. How It Works snow provides support for easily executing R functions in parallel. Most of the parallel execution functions in snow are variations of the standard lapply() function, making snow fairly easy to learn. To implement these parallel operations, snow uses a master/ worker architecture, where the master sends tasks to the workers, and the workers execute the tasks and return the results to the master. One important feature of snow is that it can be used with different transport mechanisms to communicate between the master and workers. This allows it to be portable, but still take advantage of high-performance communication mechanisms if available. snow can be used with socket connections, MPI, PVM, or NetWorkSpaces. The socket transport doesn’t require any additional packages, and is the most portable. MPI is supported via the Rmpi package, PVM via rpvm, and NetWorkSpaces via nws. The MPI 7 www.it-ebooks.info
  20. transport is popular on Linux clusters, and the socket transport is popular on multicore computers, particularly Windows computers.* snow is primarily intended to run on traditional clusters and is particularly useful if MPI is available. It is well suited to Monte Carlo simulations, bootstrapping, cross valida- tion, ensemble machine learning algorithms, and K-Means clustering. Good support is available for parallel random number generation, using the rsprng and rlecuyer packages. This is very important when performing simulations, bootstrap- ping, and machine learning, all of which can depend on random number generation. snow doesn’t provide mechanisms for dealing with large data, such as distributing data files to the workers. The input arguments must fit into memory when calling a snow function, and all of the task results are kept in memory on the master until they are returned to the caller in a list. Of course, snow can be used with high-performance distributed file systems in order to operate on large data files, but it’s up to the user to arrange that. Setting Up snow is available on CRAN, so it is installed like any other CRAN package. It is pure R code and almost never has installation problems. There are binary packages for both Windows and Mac OS X. Although there are various ways to install packages from CRAN, I generally use the install.packages() function: install.packages("snow") It may ask you which CRAN mirror to use, and then it will download and install the package. If you’re using an old version of R, you may get a message saying that snow is not available. snow has required R 2.12.1 since version 0.3-5, so you might need to download and install snow 0.3-3 from the CRAN package archives. In your browser, search for “CRAN snow” and it will probably bring you to snow’s download page on CRAN. Click on the “snow archive” link, and then you can download snow_0.3-3.tar.gz. Or you can try directly downloading it from: Once you’ve downloaded it, you can install it from the command line with: % R CMD INSTALL snow_0.3-3.tar.gz You may need to use the -l option to specify a different installation directory if you don’t have permission to install it in the default directory. For help on this command, *The multicore package is generally preferred on multicore computers, but it isn’t supported on Windows. See Chapter 3 for more information on the multicore package. 8 | Chapter 2: snow www.it-ebooks.info
  21. use the help option. For more information on installing R packages, see the section “Installing packages” in the “R Installation and Administration” manual, written by the “R Development Core Team”, and available from the R Project website. As a developer, I always use the most recent version of R. That makes it easier to install packages from CRAN, since packages are only built for the most recent version of R on CRAN. They keep around older binary distributions of packages, but they don’t build new packages or new versions of packages for anything but the current version of R. And if a new version of a package depends on a newer version of R, as with snow, you can’t even build it for yourself on an older version of R. How- ever, if you’re using R for production use, you need to be much more cautious about upgrading to the latest version of R. To use snow with MPI, you will also need to install the Rmpi package. Unfortunately, installing Rmpi is a frequent cause of problems because it has an external dependency on MPI. For more information, see “Installing Rmpi” on page 29. Fortunately, the socket transport can be used without installing any additional pack- ages. For that reason, I suggest that you start by using the socket transport if you are new to snow. Once you’ve installed snow, you should verify that you can load it: library(snow) If that succeeds, you are ready to start using snow. Working with It Creating Clusters with makeCluster In order to execute any functions in parallel with snow, you must first create a cluster object. The cluster object is used to interact with the cluster workers, and is passed as the first argument to many of the snow functions. You can create different types of cluster objects, depending on the transport mechanism that you wish to use. The basic cluster creation function is makeCluster() which can create any type of clus- ter. Let’s use it to create a cluster of four workers on the local machine using the socket transport: cl <- makeCluster(4, type="SOCK") The first argument is the cluster specification, and the second is the cluster type. The interpretation of the cluster specification depends on the type, but all cluster types allow you to specify a worker count. Working with It | 9 www.it-ebooks.info
  22. Socket clusters also allow you to specify the worker machines as a character vector. The following will launch four workers on remote machines: spec <- c("n1", "n2", "n3", "n4") cl <- makeCluster(spec, type="SOCK") The socket transport launches each of these workers via the ssh command† unless the name is “localhost”, in which case makeCluster() starts the worker itself. For remote execution, you should configure ssh to use password-less login. This can be done using public-key authentication and SSH agents, which is covered in chapter 6 of SSH, The Secure Shell: The Definitive Guide (O’Reilly) and many websites. makeCluster() allows you to specify addition arguments as configuration options. This is discussed further in “snow Configuration” on page 26. The type argument can be “SOCK”, “MPI”, “PVM” or “NWS”. To create an MPI cluster with four workers, execute: cl <- makeCluster(4, type="MPI") This will start four MPI workers on the local machine unless you make special provi- sions, as described in the section “Executing snow Programs on a Cluster with Rmpi” on page 30. You can also use the functions makeSOCKcluster(), makeMPIcluster(), makePVMcluster(), and makeNWScluster() to create specific types of clusters. In fact, makeCluster() is noth- ing more than a wrapper around these functions. To shut down any type of cluster, use the stopCluster() function: stopCluster(cl) Some cluster types may be automatically stopped when the R session exits, but it’s good practice to always call stopCluster() in snow scripts; otherwise, you risk leaking cluster workers if the cluster type is changed, for example. Creating the cluster object can fail for a number of reasons, and is there- fore a source of problems. See the section “Troubleshooting snow Pro- grams” on page 33 for help in solving these problems. Parallel K-Means We’re finally ready to use snow to do some parallel computing, so let’s look at a real example: parallel K-Means. K-Means is a clustering algorithm that partitions rows of a dataset into k clusters.‡ It’s an iterative algorithm, since it starts with a guess of the †This can be overridden via the rshcmd option, but the specified command must be command line-compatible with ssh. ‡These clusters shouldn’t be confused with cluster objects and cluster workers. 10 | Chapter 2: snow www.it-ebooks.info
  23. location for each of the cluster centers, and gradually improves the center locations until it converges on a solution. R includes a function for performing K-Means clustering in the stats package: the kmeans() function. One way of using the kmeans() function is to specify the number of cluster centers, and kmeans() will pick the starting points for the centers by randomly selecting that number of rows from your dataset. After it iterates to a solution, it com- putes a value called the total within-cluster sum of squares. It then selects another set of rows for the starting points, and repeats this process in an attempt to find a solution with a smallest total within-cluster sum of squares. Let’s use kmeans() to generate four clusters of the “Boston” dataset, using 100 random sets of centers: library(MASS) result <- kmeans(Boston, 4, nstart=100) We’re going to take a simple approach to parallelizing kmeans() that can be used for parallelizing many similar functions and doesn’t require changing the source code for kmeans(). We simply call the kmeans() function on each of the workers using a smaller value of the nstart argument. Then we combine the results by picking the result with the smallest total within-cluster sum of squares. But before we execute this in parallel, let’s try using this technique using the lapply() function to make sure it works. Once that is done, it will be fairly easy to convert to one of the snow parallel execution functions: library(MASS) results <- lapply(rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart)) i <- sapply(results, function(result) result$tot.withinss) result <- results[[which.min(i)]] We used a vector of four 25s to specify the nstart argument in order to get equivalent results to using 100 in a single call to kmeans(). Generally, the length of this vector should be equal to the number of workers in your cluster when running in parallel. Now let’s parallelize this algorithm. snow includes a number of functions that we could use, including clusterApply(), clusterApplyLB(), and parLapply(). For this example, we’ll use clusterApply(). You call it exactly the same as lapply(), except that it takes a snow cluster object as the first argument. We also need to load MASS on the workers, rather than on the master, since it’s the workers that use the “Boston” dataset. Assuming that snow is loaded and that we have a cluster object named cl, here’s the parallel version: ignore <- clusterEvalQ(cl, {library(MASS); NULL}) results <- clusterApply(cl, rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart)) i <- sapply(results, function(result) result$tot.withinss) result <- results[[which.min(i)]] Working with It | 11 www.it-ebooks.info
  24. clusterEvalQ() takes two arguments: the cluster object, and an expression that is eval- uated on each of the workers. It returns the result from each of the workers in a list, which we don’t use here. I use a compound expression to load MASS and return NULL to avoid sending unnecessary data back to the master process. That isn’t a serious issue in this case, but it can be, so I often return NULL to be safe. As you can see, the snow version isn’t that much different than the lapply() version. Most of the work was done in converting it to use lapply(). Usually the biggest problem in converting from lapply() to one of the parallel operations is handling the data prop- erly and efficiently. In this case, the dataset was in a package, so all we had to do was load the package on the workers. The kmeans() function uses the sample.int() function to choose the starting cluster centers, which depend on the random number genera- tor. In order to get different solutions, the cluster workers need to use different streams of random numbers. Since the workers are randomly seeded when they first start generating random numbers,Đ this example will work, but it is good practice to use a parallel random number gen- erator. See “Random Number Generation” on page 25 for more information. Initializing Workers In the last section we used the clusterEvalQ() function to initialize the cluster workers by loading a package on each of them. clusterEvalQ() is very handy, especially for interactive use, but it isn’t very general. It’s great for executing a simple expression on the cluster workers, but it doesn’t allow you to pass any kind of parameters to the expression, for example. Also, although you can use it to execute a function, it won’t send that function to the worker first,‖ as clusterApply() does. My favorite snow function for initializing the cluster workers is clusterCall(). The ar- guments are pretty simple: it takes a snow cluster object, a worker function, and any number of arguments to pass to the function. It simply calls the function with the specified arguments on each of the cluster workers, and returns the results as a list. It’s like clusterApply() without the x argument, so it executes once for each worker, like clusterEvalQ(), rather than once for each element in x. ĐAll R sessions are randomly seeded when they first generate random numbers, unless they were restored from a previous R session that generated random numbers. snow workers never restore previously saved data, so they are always randomly seeded. ‖ How exactly snow sends functions to the workers is a bit complex, raising issues of execution context and environment. See “Functions and Environments” on page 23 for more information. 12 | Chapter 2: snow www.it-ebooks.info
  25. clusterCall() can do anything that clusterEvalQ() does and more.# For example, here’s how we could use clusterCall() to load the MASS package on the cluster workers: clusterCall(cl, function() { library(MASS); NULL }) This defines a simple function that loads the MASS package and returns NULL.* Returning NULL guarantees that we don’t accidentally send unnecessary data transfer back to the master.† The following will load several packages specified by a character vector: worker.init <- function(packages) { for (p in packages) { library(p, character.only=TRUE) } NULL } clusterCall(cl, worker.init, c('MASS', 'boot')) Setting the character.only argument to TRUE makes library() interpret the argument as a character variable. If we didn’t do that, library() would attempt to load a package named p repeatedly. Although it’s not as commonly used as clusterCall(), the clusterApply() function is also useful for initializing the cluster workers since it can send different data to the initialization function for each worker. The following creates a global variable on each of the cluster workers that can be used as a unique worker ID: clusterApply(cl, seq(along=cl), function(id) WORKER.ID <<- id) Load Balancing with clusterApplyLB We introduced the clusterApply() function in the parallel K-Means example. The next parallel execution function that I’ll discuss is clusterApplyLB(). It’s very similar to clusterApply(), but instead of scheduling tasks in a round-robin fashion, it sends new tasks to the cluster workers as they complete their previous task. By round-robin, I mean that clusterApply() distributes the elements of x to the cluster workers one at a time, in the same way that cards are dealt to players in a card game. In a sense, clusterApply() (politely) pushes tasks to the workers, while clusterApplyLB() lets the workers pull tasks as needed. That can be more efficient if some tasks take longer than others, or if some cluster workers are slower. #This is guaranteed since clusterEvalQ() is implemented using clusterCall(). * Defining anonymous functions like this is very useful, but can be a source of performance problems due to R’s scoping rules and the way it serializes functions. See “Functions and Environments” on page 23 for more information. † The return value from library() isn’t big, but if the initialization function was assigning a large matrix to a variable, you could inadvertently send a lot of data back to the master, significantly hurting the performance of your program. Working with It | 13 www.it-ebooks.info
  26. To demonstrate clusterApplyLB(), we’ll execute Sys.sleep() on the workers, giving us complete control over the task lengths. Since our real interest in using cluster ApplyLB() is to improve performance, we’ll use snow.time() to gather timing informa- tion about the overall execution.‡ We will also use snow.time()’s plotting capability to visualize the task execution on the workers: set.seed(7777442) sleeptime <- abs(rnorm(10, 10, 10)) tm <- snow.time(clusterApplyLB(cl, sleeptime, Sys.sleep)) plot(tm) Ideally there would be solid horizontal bars for nodes 1 through 4 in the plot, indicating that the cluster workers were always busy, and therefore running efficiently. cluster ApplyLB() did pretty well, although there was some wasted time at the end. Now let’s try the same problem with clusterApply():Đ set.seed(7777442) sleeptime <- abs(rnorm(10, 10, 10)) tm <- snow.time(clusterApply(cl, sleeptime, Sys.sleep)) plot(tm) ‡ snow.time() is available in snow as of version 0.3-5. ĐI’m setting the RNG seed so we get the same value of sleeptime as in the previous example. 14 | Chapter 2: snow www.it-ebooks.info
  27. As you can see, clusterApply() is much less efficient than clusterApplyLB() in this example: it took 53.7 seconds, versus 28.5 seconds for clusterApplyLB(). The plot shows how much time was wasted due to the round-robin scheduling. But don’t give up on clusterApply(): it has its uses. It worked fine in the parallel K- Means example because we had the same number of tasks as workers. It is also used to implement the very useful parLapply() function, which we will discuss next.‖ Task Chunking with parLapply Now that we’ve discussed and compared clusterApply() and clusterApplyLB(), let’s consider parLapply(), a third parallel lapply() function that has the same arguments and basic behavior as clusterApply() and clusterApplyLB(). But there is an important difference that makes it perhaps the most generally useful of the three. ‖ It’s also possible that the extra overhead in clusterApplyLB() to determine which worker is ready for the next task could make clusterApply() more efficient in some case, but I’m skeptical. Working with It | 15 www.it-ebooks.info
  28. parLapply() is a high-level snow function, that is actually a deceptively simple function wrapping an invocation of clusterApply(): > parLapply function (cl, x, fun, ) docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, fun, )) Basically, parLapply() splits up x into a list of subvectors, and processes those subvec- tors on the cluster workers using lapply(). In effect, it is prescheduling the work by dividing the tasks into as many chunks as there are workers in the cluster. This is functionally equivalent to using clusterApply() directly, but it can be much more ef- ficient, since there are fewer I/O operations between the master and the workers. If the length of x is already equal to the number of workers, then parLapply() has no advant- age. But if you’re parallelizing an R script that already uses lapply(), the length of x is often very large, and at any rate is completely unrelated to the number of workers in your cluster. In that case, parLapply() is a better parallel version of lapply() than clusterApply(). One way to think about it is that parLapply() interprets the x argument differently than clusterApply(). clusterApply() is low-level, and treats x as a specification of the tasks to execute on the cluster workers using fun. parLapply() treats x as a source of disjoint input arguments to execute on the cluster workers using lapply() and fun. cluster Apply() gives you more control over what gets sent to who, while parLapply() provides a convenient way to efficiently divide the work among the cluster workers. An interesting consequence of parLapply()’s work scheduling is that it is much more efficient than clusterApply() if you have many more tasks than workers, and one or more large, additional arguments to pass to parLapply(). In that case, the additional arguments are sent to each worker only once, rather than possibly many times. Let’s try doing that, using a slightly altered parallel sleep function that takes a matrix as an argument: bigsleep <- function(sleeptime, mat) Sys.sleep(sleeptime) bigmatrix <- matrix(0, 2000, 2000) sleeptime <- rep(1, 100) I defined the sleeptimes to be small, many, and equally sized. This will accentuate the performance differences between clusterApply() and parLapply(): tm <- snow.time(clusterApply(cl, sleeptime, bigsleep, bigmatrix)) plot(tm) 16 | Chapter 2: snow www.it-ebooks.info
  29. This doesn’t look very efficient: you can see that there are many sends and receives between the master and the workers, resulting in relatively big gaps between the com- pute operations on the cluster workers. The gaps aren’t due to load imbalance as we saw before: they’re due to I/O time. We’re now spending a significant fraction of the elapsed time sending data to the workers, so instead of the ideal elapsed time of 25 seconds,# it’s taking 77.9 seconds. Now let’s do the same thing using parLapply(): tm <- snow.time(parLapply(cl, sleeptime, bigsleep, bigmatrix)) plot(tm) #The ideal elapsed time is sum(sleeptime) / length(cl). Working with It | 17 www.it-ebooks.info
  30. The difference is dramatic, both visually and in elapsed time: it took only 27.2 seconds, beating clusterApply() by 50.7 seconds. Keep in mind that this particular use of clusterApply() is bad: it is needlessly sending the matrix to the worker with every task. There are various ways to fix that, and using parLapply() happens to work well in this case. On the other hand, if you’re sending huge objects in x, then there’s not much you can do, and parLapply() isn’t going to help. My point is that parLapply() schedules work in a useful and efficient way, making it probably the single most useful parallel execution function in snow. When in doubt, use parLapply(). Vectorizing with clusterSplit In the previous section I showed you how parLapply() uses clusterApply() to imple- ment a parallel operation that solves a certain class of parallel program quite nicely. Recall that parLapply() executes a user-supplied function for each element of x just like clusterApply(). But what if we want the function to operate on subvectors of x? That’s similar to what parLapply() does, but is a bit easier to implement, since it doesn’t need to use lapply() to call the user’s function. 18 | Chapter 2: snow www.it-ebooks.info
  31. We could use the splitList() function, like parLapply() does, but that is a snow internal function. Instead, we’ll use the clusterSplit() function which is very similar, and slightly more convenient. Let’s try splitting the sequence from 1 to 30 for our cluster using clusterSplit(): > clusterSplit(cl, 1:30) [[1]] [1] 1 2 3 4 5 6 7 8 [[2]] [1] 9 10 11 12 13 14 15 [[3]] [1] 16 17 18 19 20 21 22 [[4]] [1] 23 24 25 26 27 28 29 30 Since our cluster has four workers, it splits the sequence into a list of four nearly equal length vectors, which is just what we need. Now let’s define parVapply() to split x using clusterSplit(), execute the user function on each of the pieces using clusterApply(), and combine the results using do.call() and c(): parVapply parVapply(cl, 1:10, "^", 1/3) [1] 1.000000 1.259921 1.442250 1.587401 1.709976 1.817121 1.912931 2.000000 [9] 2.080084 2.154435 This works because the ^ function takes a vector as its first argument and returns a vector of the same length.* This technique can be a useful for executing vector functions in parallel. It may also be more efficient than using parLapply(), for example, but for any function worth executing in parallel, the difference in efficiency is likely to be small. And remember that most, if not all, vector functions execute so quickly that it is never worth it to execute them in parallel with snow. Such fine-grained problems fall much more into the domain of multithreaded computing. * Normally the second argument to ^ can have the same length as the first, but it must be length one in this example because parVapply() only splits the first argument. Working with It | 19 www.it-ebooks.info
  32. Load Balancing Redux We’ve talked about the advantages of parLapply() over clusterApply() at some length. In particular, when there are many more tasks than cluster workers and the task objects sent to the workers are large, there can be serious performance problems with cluster Apply() that are solved by parLapply(). But what if the task execution has significant variation so that we need load balancing? clusterApplyLB() does load balancing, but would have the same performance problems as clusterApply(). We would like a load balancing equivalent to parLapply(), but there isn’t one—so let’s write it.† In order to achieve dynamic load balancing, it helps to have a number of tasks that is at least a small integer multiple of the number of workers. That way, a long task assigned to one worker can be offset by many shorter tasks being done by other workers. If that is not the case, then the other workers will sit idle while the one worker completes the long task. parLapply() creates exactly one task per worker, which is not what we want in this case. Instead, we’ll first send the function and the fixed arguments to the cluster workers using clusterCall(), which saves them in the global environment, and then send the varying argument values using clusterApplyLB(), specifying a function that will execute the user-supplied function along with the full collection of arguments. Here are the function definitions for parLapplyLB() and the two functions that it exe- cutes on the cluster workers: parLapplyLB <- function(cl, x, fun, ) { clusterCall(cl, LB.init, fun, ) r <- clusterApplyLB(cl, x, LB.worker) clusterEvalQ(cl, rm('.LB.fun', '.LB.args', pos=globalenv())) r } LB.init <- function(fun, ) { assign('.LB.fun', fun, pos=globalenv()) assign('.LB.args', list( ), pos=globalenv()) NULL } LB.worker <- function(x) { do.call('.LB.fun', c(list(x), .LB.args)) } parLapplyLB() initializes the workers using clusterCall(), executes the tasks with clusterApplyLB(), cleans up the global environment of the cluster workers with clusterEvalQ(), and finally returns the task results. †A future release of snow could optimize clusterApplyLB() by not sending the function and constant arguments to the workers in every task. At that point, this example will lose any practical value that it may have. 20 | Chapter 2: snow www.it-ebooks.info
  33. That’s all there is to implementing a simple and efficient load balancing parallel exe- cution function. Let’s compare clusterApplyLB() to parLapplyLB() using the same test function that we used to compare clusterApply() and parLapply(), starting with clusterApplyLB(): bigsleep <- function(sleeptime, mat) Sys.sleep(sleeptime) bigmatrix <- matrix(0, 2000, 2000) sleeptime <- rep(1, 100) tm <- snow.time(clusterApplyLB(cl, sleeptime, bigsleep, bigmatrix)) plot(tm) There are lots of gaps in the execution bars due to high I/O time: the master is barely able to supply the workers with tasks. Obviously this problem isn’t going to scale to many more workers. Now let’s try our new parLapplyLB() function: tm <- snow.time(parLapplyLB(cl, sleeptime, bigsleep, bigmatrix)) plot(tm) Working with It | 21 www.it-ebooks.info
  34. That took only 28.4 seconds versus 53.2 seconds for clusterApplyLB(). Notice that the first task on each worker has a short execution time, but a long task send time, as seen by the slope of the first four lines between the master (node 0) and the workers (nodes 1-4). Those are the worker initialization tasks executed by cluster Call() that send the large matrix to the workers. The tasks executed via clusterApplyLB() were more efficient, as seen by the vertical communication lines and the solid horizontal bars. By using short tasks, I was able to demonstrate a pretty noticeable dif- ference in performance, but with longer tasks, the difference becomes less significant. In other words, we can realize decent efficiency when- ever the time to compute a task significantly exceeds the time needed to send the inputs to and return the outputs from the worker evaluating the task. 22 | Chapter 2: snow www.it-ebooks.info
  35. Functions and Environments This section discusses a number of rather subtle points. An understand- ing of these is not essential for basic snow use, but could be invaluable when trying to debug more complicated usage scenarios. The reader may want to skim through this on a first reading, but remember to return to it if a seemingly obscure problem crops up. Most of the parallel execution functions in snow take a function object as an argument, which I call the worker function, since it is sent to the cluster workers, and subsequently executed by them. In order to send it to the workers, the worker function must be serialized into a stream of bytes using the serialize() function.‡ That stream of bytes is converted into a copy of the original object using the unserialize() function. In addition to a list of formal arguments and a body, the worker function includes a pointer to the environment in which it was created. This environment becomes the parent of the evaluation environment when the worker function is executed, giving the worker function access to non-local variables. Obviously, this environment must be serialized along with the rest of the worker function in order for the function to work properly after being unserialized. However, environments are serialized in a special way in R. In general, the contents are included when an environment is serialized, but not always. Name space environments are serialized by name, not by value. That is, the name of the package is written to the resulting stream of bytes, not the symbols and objects contained in the environment. When a name space is unserialized, it is reconstructed by finding and loading the cor- responding package. If the package cannot be loaded, then the stream of bytes cannot be unserialized. The global environment is also serialized by name, and when it is un- serialized, the resulting object is simply a reference to the existing, unmodified global environment. So what does this mean to you as a snow programmer? Basically, you must ensure that all the variables needed to execute the worker function are available after it has been unserialized on the cluster workers. If the worker function’s environment is the global environment and the worker function needs to access any variables in it, you need to send those variables to the workers explicitly. This can be done, for example, by using the clusterExport() function. But if the worker function was created by another func- tion, its environment is the evaluation environment of the creator function when the worker function was created. All the variables in this environment will be serialized along with the worker function, and accessible to it when it is executed by the cluster workers. This can be a handy way of making variables available to the worker function, ‡ Actually, if you specify the worker function by name, rather than by providing the definition of the function, most of the parallel execution functions (parLapply() is currently an exception) will use that name to look up that function in the worker processes, thus avoiding function serialization. Working with It | 23 www.it-ebooks.info
  36. but if you’re not careful, you could accidentally serialize large, unneeded objects along with the worker function, causing performance to suffer. Also, if you want the worker function to use any of the creator function’s arguments, you need to evaluate those arguments before calling parLapply() or clusterApplyLB(); otherwise, you may not be able to evaluate them successfully on the workers due to R’s lazy argument evaluation. Let’s look at a few examples to illustrate some of these issues. We’ll start with a script that multiplies a vector x by a sequence of numbers: a <- 1:4 x <- rnorm(4) clusterExport(cl, "x") mult <- function(s) s * x parLapply(cl, a, mult) In this script, the function mult() is defined at the top level, so its environment is the global environment.Đ Thus, x isn’t serialized along with mult(), so we need to send it to the cluster workers using the clusterExport() function. Of course, a more natural solution in this case would be to include x as an explicit argument to mult(), and then parLapply() would send it to the workers for us. However, using clusterExport() could be more efficient if we were going to reuse x by calling mult() many times with parLapply(). Now let’s turn part of this script into a function. Although this change may seem trivial, it actually changes the way mult() is serialized in parLapply(): pmult <- function(cl) { a <- 1:4 x <- rnorm(4) mult <- function(s) s * x parLapply(cl, a, mult) } pmult(cl) Since mult() is created by pmult(), all of pmult()’s local variables will be accessible when mult() is executed by the cluster workers, including x. Thus, we no longer call cluster Export(). Pmult() would be more useful if the values to be multiplied weren’t hardcoded, so let’s improve it by passing a and x in as arguments: pmult <- function(cl, a, x) { x # force x mult <- function(s) s * x parLapply(cl, a, mult) } scalars <- 1:4 dat <- rnorm(4) pmult(cl, scalars, dat) ĐYou can verify this with the command environment(mult). 24 | Chapter 2: snow www.it-ebooks.info
  37. At this point, you may be wondering why x is on a line by itself with the cryptic comment “force x”. Although it may look like it does nothing, this operation forces x to be eval- uated by looking up the value of the variable dat (the actual argument corresponding to x that is passed to the function when pmult() is invoked) in the caller’s execution environment. R uses lazy argument evaluation, and since x is now an argument, we have to force its evaluation before calling parLapply(); otherwise, the workers will re- port that dat wasn’t found, since they don’t have access to the environment where dat is defined. Note that they wouldn’t say x wasn’t found: they would find x, but wouldn’t be able to evaluate it because they don’t have access to dat. By evaluating x before calling parLapply(), mult()’s environment will be serialized with x set to the value of dat, rather than the symbol dat. Notice in this last example that, in addition to x, a and cl are also serialized along with mult(). mult() doesn’t need to access them, but since they are defined in pmult’s eval- uation environment, they will be serialized along with mult(). To prevent that, we can reset the environment of mult() to the global environment and pass x to mult() explicitly: pmult <- function(cl, a, x) { mult <- function(s, x) s * x environment(mult) <- .GlobalEnv parLapply(cl, a, mult, x) } scalars <- 1:4 dat <- rnorm(4) pmult(cl, scalars, dat) Of course, another way to achieve the same result is to create mult() at the top level of the script so that mult() is associated with the global environment in the first place. Unfortunately, you run into some tricky issues when sending function objects over the network. You may conclude that you don’t want to use the worker function’s envi- ronment to send data to your cluster workers, and that’s a perfectly reasonable position. But hopefully you now understand the issues well enough to figure out what methods work best for you. Random Number Generation As I mentioned previously, snow is very useful for performing Monte Carlo simulations, bootstrapping, and other operations that depend on the use of random numbers. When running such operations in parallel, it’s important that the cluster workers generate different random numbers; otherwise, the workers may all replicate each other’s results, defeating the purpose of executing in parallel. Rather than using ad-hoc schemes for seeding the workers differently, it is better to use a parallel random number generator package. snow provides support for the rlecuyer and rsprng packages, both of which are available on CRAN. With one of these packages installed on all the nodes of your cluster, you can configure your cluster workers to use it via the clusterSetupRNG() Working with It | 25 www.it-ebooks.info
  38. function. The type argument specifies which generator to use. To use rlecuyer, set type to RNGstream: clusterSetupRNG(cl, type='RNGstream') To use rsprng, set type to SPRNG: clusterSetupRNG(cl, type='SPRNG') You can specify a seed using the seed argument. rsprng uses a single integer for the seed, while rlecuyer uses a vector of six integers: clusterSetupRNG(cl, type='RNGstream', seed=c(1,22,333,444,55,6)) When using rsprng, a random seed is used by default, but not with rlecuyer. If you want to use a random seed with rlecuyer, you’ll have to specify it explicitly using the seed argument. Now the standard random number functions will use the specified parallel random number generator: > unlist(clusterEvalQ(cl, rnorm(1))) [1] -1.0452398 -0.3579839 -0.5549331 0.7823642 If you reinitialize the cluster workers using the same seed, you will get the same random number from each of the workers. We can also get reproducible results using clusterApply(), but not with clusterAp plyLB() because clusterApply() always uses the same task scheduling, while cluster ApplyLB() does not.‖ snow Configuration snow includes a number of configuration options for controlling the way the cluster is created. These options can be specified as named arguments to the cluster creation function (makeCluster(), makeSOCKcluster(), makeMPIcluster(), etc.). For example, here is the way to specify an alternate hostname for the master: cl <- makeCluster(3, type="SOCK", master="192.168.1.100") The default value of master is computed as Sys.info()[['nodename']]. However, there’s no guarantee that the workers will all be able to resolve that name to an IP address. By setting master to an appropriate dot- separated IP address, you can often avoid hostname resolution problems. ‖ Actually, you can achieve reproducibility with clusterApplyLB() by setting the seed to a task specific value. This can be done by adding the operation to the beginning of the worker function, or if using a function from a library, wrapping that function in a new function that sets the seed and then calls the library function. 26 | Chapter 2: snow www.it-ebooks.info
  39. You can also use the setDefaultClusterOptions() function to change a default config- uration option during an R session. By default, the outfile option is set to /dev/null, which causes all worker output to be redirected to the null device (the proverbial bit bucket). To prevent output from being redirected, you can change the default value of outfile to the empty string: setDefaultClusterOptions(outfile="") This is a useful debugging technique which we will discuss more in “Troubleshooting snow Programs” on page 33. Here is a summary of all of the snow configuration options: Table 2-1. snow configuration options Name Type Description Default value port Integer Port that the master listens on 10187 timeout Integer Socket timeout in seconds 31536000 (one year in seconds) master String Master’s hostname that workers con- Sys.info()["nodename"] nect to homogeneous Logical Are workers homogeneous? TRUE if R_SNOW_LIB set, else FALSE type String Type of cluster makeCluster should cre- NULL, which is handled specially ate outfile String Worker log file “/dev/null” “nul:” on Windows rhome String Home of R installation, used to locate R $R_HOME executable user String User for remote execution Sys.info()["user"] rshcmd String Remote execution command “ssh” rlibs String Location of R packages $R_LIBS scriptdir String Location of snow worker scripts snow installation directory rprog String Path of R executable $R_HOME/bin/R snowlib String Path of “library” where snow is installed directory in which snow is installed rscript String Path of Rscript command $R_HOME/bin/Rscript $R_HOME/bin/ Rscript.exe on Windows useRscript Logical Should workers be started using Rscript TRUE if file specified by Rscript exists command? manual Logical Should workers be started manually? FALSE It is possible, although a bit tricky, to configure different workers differently. I’ve done this when running a snow program in parallel on an ad-hoc collection of workstations. In fact, there are two mechanisms available for that with the socket transport. The first approach works for all the transports. You set the homogeneous option to FALSE, which causes snow to use a special startup script to launch the workers. This alternate script Working with It | 27 www.it-ebooks.info
  40. doesn’t assume that the worker nodes are set up the same as the master node, but can look for R or Rscript in the user’s PATH, for example. It also supports the use of envi- ronment variables to configure the workers, such as R_SNOW_RSCRIPT_CMD and R_SNOW_LIB to specify the path of the Rscript command and the snow installation di- rectory. These environment variables can be set to appropriate values in the user’s environment on each worker machine using the shell’s start up scripts. The second approach to heterogeneous configuration only works with the socket and nws transports. When you call makeSOCKcluster(), you specify the worker machines as a list of lists. In this case, the hostname of the worker is specified by the host element of each sublist. The other elements of the sublists are used to override the corresponding option for that worker. Let’s say we want to create a cluster with two workers: n1 and n2, but we need to log in as a different user on machine n2: > workerList cl clusterEvalQ(cl, Sys.info()[["user"]]) [[1]] [1] "weston" [[2]] [1] "steve" > stopCluster(cl) It can also be useful to set the outfile option differently to avoid file conflicts between workers: > workerList cl clusterEvalQ(cl, Sys.glob("*.log")) [[1]] [1] "n1.log" [[2]] [1] "n2-1.log" "n2-2.log" [[3]] [1] "n2-1.log" "n2-2.log" > stopCluster(cl) This also demonstrates that different methods for setting options can be used together. The machine-specific option values always take precedence. 28 | Chapter 2: snow www.it-ebooks.info
  41. I prefer to use my ssh config file to specify a different user for different hosts, but obviously that doesn’t help with setting outfile. Installing Rmpi As I mentioned previously, installing Rmpi can be problematic because it depends on MPI being previously installed. Also, there are multiple MPI distributions, and some of the older distributions have compatibility problems with Rmpi. In general, Open MPI is the preferred MPI distribution. Fortunately, Open MPI is readily available for modern Linux systems. The website for the Open MPI Project is Another problem is that there isn’t a binary distribution of Rmpi available for Windows. Thus, even if you have MPI installed on a Windows machine, you will also need to install Rmpi from the source distribution, which requires additional tools that may also need to be installed. For more information on installing Rmpi on Windows, see the documentation in the Rmpi package. That’s beyond the scope of this book. Installation of Rmpi on the Mac was quite simple on Mac OS X 10.5 and 10.6, both of which came with Open MPI, but unfortunately, Apple stopped distributing it in Mac OS X 10.7. If you’re using 10.5 or 10.6, you can (hopefully) install Rmpi quite easily:# install.packages("Rmpi") If you’re using Mac OS X 10.7, you’ll have to install Open MPI first, and then you’ll probably have to build Rmpi from the source distribution since the binary distribution probably won’t be compatible with your installation of Open MPI. I’ll discuss installing Rmpi from the source distribution shortly, but not Open MPI. On Debian/Ubuntu, Rmpi is available in the “r-cran-rmpi” Debian package, and can be installed with apt-get. That’s the most foolproof way to install Rmpi on Ubuntu, for example, since apt-get will automatically install a compatible version of MPI, if nec- essary. For non-Debian based systems, I recommend that you install Open MPI with your local packaging tool, and then try to use install.packages() to install Rmpi. This will fail if the configuration script can’t find the MPI installation. In that case you will have to download the source distribution, and install it using a command such as: % R CMD INSTALL configure-args=" with-mpi=$MPI_PATH" Rmpi_0.5-9.tar.gz #It’s possible that newer versions of Rmpi won’t be built for the Mac on CRAN because it won’t work on Mac OS X 10.7, but it’s still available as I’m writing this in September 2011. Working with It | 29 www.it-ebooks.info
  42. where the value of MPI_PATH is the directory containing the Open MPI lib and include directories.* Notice that this example uses the configure-args argument to pass the with-mpi argument to Rmpi’s configure script. Another important configure argument is with-Rmpi-type, which may need to be set to “OPENMPI”, for example. As I’ve said, installing Rmpi from source can be difficult. If you run into problems and don’t want to switch to Debian/Ubuntu, your best bet is to post a question on the R project’s “R-sig-hpc” mailing list. You can find it by clicking on the “Mailing Lists” link on the R project’s home page. Executing snow Programs on a Cluster with Rmpi Throughout this chapter I’ve been using the socket transport because it doesn’t require any additional software to install, making it the most portable snow transport. However, the MPI transport is probably the most popular, at least on clusters. Of course, most of what we’ve discussed is independent of the transport. The difference is mostly in how the cluster object is created and how the snow script is executed. To create an MPI cluster object, set the type argument of makeCluster() to MPI or use the makeMPIcluster() function. If you’re running interactively, you can create an MPI cluster object with four workers as follows: cl <- makeCluster(4, type="MPI") This is equivalent to: cl <- makeMPIcluster(4) This creates a spawned cluster, since the workers are all started by snow for you via the mpi.comm.spawn() function. Notice that we don’t specify which machines to use, only the number of workers. For that reason, I like to compute the worker count using the mpi.universe.size() function, which returns the size of the initial runtime environment.† Since the master process is included in that size, the worker count would be computed as mpi.universe.size() - 1.‡ We shut down an MPI cluster the same as any cluster: stopCluster(cl) *I use the command locate include/mpi.h to find this directory. On my machine, this returns/usr/lib/ openmpi/include/mpi.h, so I set MPI_PATH to /usr/lib/openmpi. † mpi.universe.size() had a bug in older versions of Rmpi, so you may need to upgrade to Rmpi 0.5-9. ‡I don’t use mpi.universe.size() when creating an MPI cluster in an interactive session, since in that context, mpi.universe.size() returns 1, which would give an illegal worker count of zero. 30 | Chapter 2: snow www.it-ebooks.info
  43. As you can see, there isn’t much to creating an MPI cluster object. You can specify configuration options, just as with a socket cluster, but basically it is very simple. However, you should be aware that the cluster workers are launched differently de- pending on how the R script was executed. If you’re running interactively, for example, the workers will always be started on the local machine. The only way that I know of to start the workers on remote machines is to execute the R interpreter using a command such as mpirun, mpiexec, or in the case of Open MPI, orterun. As I noted previously, you can’t specify the machines on which to execute the workers with makeMPIcluster(). That is done with a separate program that comes with your MPI distribution. Open MPI comes with three utilities for executing MPI programs: orterun, mpirun, and mpiexec, but they all work in exactly the same way,Đ so I will refer to orterun for the rest of this discussion. orterun doesn’t know anything about R or R scripts, so we need to use orterun to execute the R interpreter, which in turn executes the R script. Let’s start by creating an R script (Example 2-1), which I’ll call mpi.R. Example 2-1. mpi.R library(snow) library(Rmpi) cl <- makeMPIcluster(mpi.universe.size() - 1) r <- clusterEvalQ(cl, R.version.string) print(unlist(r)) stopCluster(cl) mpi.quit() This is very similar to our very first example, except that it loads the Rmpi package, calls makeMPIcluster() rather than makeSOCKcluster(), and calls mpi.quit() at the end. Loading Rmpi isn’t strictly necessary, since calling makeMPIcluster() will automatically load Rmpi, but I like to do it explicitly. makeMPIcluster() creates the MPI cluster object, as discussed in the previous section. mpi.quit() terminates the MPI execution envi- ronment, detaches the Rmpi package, and quits R, so it should always go at the end of your script. This is often left out, but I believe it is good practice to call it.‖ I’ve gotten very stern warning messages from orterun in some cases when I failed to call mpi.quit(). To execute mpi.R using the local machine as the master, and n1, n2, n3 and n4 as the workers, we can use the command:# % orterun -H localhost,n1,n2,n3,n4 -n 1 R slave -f mpi.R Đ orterun, mpirun, and mpiexec are in fact the same program in Open MPI. ‖ You can use mpi.finalize() instead, which doesn’t quit R. #The orterun command in Open MPI accepts several different arguments to specify the host list and the number of workers. It does this to be compatible with previous MPI distributions, so don’t be confused if you’re used to different argument names. Working with It | 31 www.it-ebooks.info
  44. The -H option specifies the list of machines available for execution. By using -n 1, orterun will only execute the command R slave -f mpi.R on the first machine in the list, which is localhost in this example. This process is the master, equivalent to the interactive R session in our previous snow examples. When the master executes make MPIcluster(mpi.universe.size() - 1), four workers will be spawned. orterun will ex- ecute these workers on machines n1, n2, n3 and n4, since they are next in line to receive a process. Those are the basics, but there are a few other issues to bear in mind. First, the master and the worker processes have their working directory set to the working directory of the process executing orterun. That’s no problem for the master in our example, since the master runs on the same machine as orterun. But if there isn’t a directory with the same path on any of the worker machines, you will get an error. For that reason, it is useful to work from a directory that is shared across the cluster via a network file system. That isn’t necessary, however. If you specify the full path to the R script, you could use the orterun -wdir option to set the working directory to /tmp: % orterun -wdir /tmp -H localhost,n1,n2,n3,n4 -n 1 R slave -f ~/mpi.R This example still assumes that R is in your search path on localhost. If it isn’t, you can specify the full path of the R interpreter on localhost. That can solve some of the orterun related problems, but snow still makes a number of assumptions about where to find things on the workers as well. See “snow Configura- tion” on page 26 for more information. Executing snow Programs with a Batch Queueing System Many cluster administrators require that all parallel programs be executed via a batch queueing system. There are different ways that this can be done, and different batch queueing systems, but I will describe a method that has been commonly used for a long time, and is supported by many batch queueing systems, such as PBS/TORQUE, SGE and LSF. Basically you submit a shell script, and the shell script executes your R script using orterun as we described in the section “Executing snow Programs on a Cluster with Rmpi” on page 30. When you submit the shell script, you tell the batch queueing system how many nodes you want using the appropriate argument to the submit command. The shell script may need to read an environment variable to learn what nodes it can execute on, and then pass that information on to the orterun command via an argument such as -hostfile or -H. Of course the details vary depending on the batch queueing system, MPI distribution, and cluster configuration. As an example, I’ll describe how this can be done using PBS/ TORQUE and Open MPI. 32 | Chapter 2: snow www.it-ebooks.info
  45. It’s actually very simple to use PBS/TORQUE with Open MPI, since Open MPI auto- matically gets the list of hosts using the environment variables set by PBS/TORQUE.* The code in Example 2-2 simplifies the orterun command used in the script. Example 2-2. batchmpi.sh #!/bin/sh #PBS -N SNOWMPI #PBS -j oe cd $PBS_O_WORKDIR orterun -n 1 /usr/bin/R slave -f mpi.R > mpi-$PBS_JOBID.out 2>&1 This script uses PBS directives to specify the name of the job, and to merge the job’s standard output and standard error. It then cd’s to the directory from which you sub- mitted the job, which is helpful for finding the mpi.R script. Finally it uses orterun to execute mpi.R. We submit batchmpi.sh using the PBS/TORQUE qsub command: % qsub -q devel -l nodes=2:ppn=4 batchmpi.sh This submits the shell script to the devel queue, requesting two nodes with four pro- cessors per node. The -l option is used to specify the resources needed by the job. The resource specifications vary from cluster to cluster, so talk to your cluster administrator to find out how you should specify the number of nodes and processors. If you’re using LSF or SGE, you will probably need to specify the hosts via the orterun - hostfile or -H option. For LSF, use the bsub -n option to specify the number of cores, and the LSB_HOSTS environment variable to get the allocated hosts. With SGE, use the qsub -pe option and the PE_HOSTFILE environment variable. The details are different, but the basic idea is the same. Troubleshooting snow Programs Unfortunately, a lot of things can go wrong when using snow. That’s not really snow’s fault: there’s just a lot of things that have to be set up properly, and if the different cluster nodes are configured differently, snow may have trouble launching the cluster workers. It’s possible to configure snow to deal with heterogeneous clusters.† Fortu- nately, if your cluster is already used for parallel computing, there’s a good chance it is already set up in a clean, consistent fashion, and you won’t run into any problems when using snow. Obviously you need to have R and snow installed on all of the machines that you’re attempting to use for your cluster object. You also need to have ssh servers running on all of the cluster workers if using the socket transport, for instance. * Actually, it’s possible to configure Open MPI without support for PBS/TORQUE, in which case you’ll have to include the arguments -hostfile $PBS_NODEFILE when executing orterun. † We discuss heterogeneous configuration in “snow Configuration” on page 26. Working with It | 33 www.it-ebooks.info
  46. There are several techniques available for finding out more information about what is going wrong. When using the socket transport, the single most useful method of troubleshooting is manual mode. In manual mode, you start the workers yourself, rather than having snow start them for you. That allows you to run snow jobs on a cluster that doesn’t have ssh servers, for example. But there are also a few other advantages to manual mode. For one thing, it makes it easier to see error messages. Rather than searching for them in log files, they can be displayed right in your terminal session. To enable manual mode, set the manual option to TRUE when creating the socket cluster object. I also recommend specifying outfile="", which prevents output from being redirected: cl cl <- makeCluster(c('n1', 'n2'), type="SOCK", manual=TRUE, outfile="") Manually start worker on n1 with /usr/lib/R/bin/Rscript /usr/lib/R/site-library/snow/RSOCKnode.R MASTER=beard PORT=10187 OUT= SNOWLIB=/usr/lib/R/site-library The argument MASTER=beard indicates that the value of the master option is “beard.” You can now use the ping command from your terminal window on n1 to see if the master is reachable from n1 by that name. Here’s the kind of output that you should see: n1% ping beard PING beard (192.168.1.109) 56(84) bytes of data. 64 bytes from beard (192.168.1.109): icmp_req=1 ttl=64 time=0.020 ms ‡If ssh fails at this point, you may have found your problem. 34 | Chapter 2: snow www.it-ebooks.info
  47. This demonstrates that n1 is able to resolve the name “beard,” knows a network route to that IP address, can get past any firewall, and is able to get a reply from the master machine.Đ But if ping issues the error message “ping: unknown host beard”, then you have a hostname resolution problem. Setting the master option to a different value when cre- ating the cluster might fix the problem. Other errors may indicate a networking problem that can be fixed by your sysadmin. If the value of master seems good, you should execute the command displayed by makeCluster() in hopes of getting a useful error message. Note that many of these problems could occur using any snow transport, so running a simple snow test code using the socket transport and manual mode can be an effective means to ensure a good setup even if you later intend to use a different transport. The outfile option in itself is also useful for troubleshooting. It allows you to redirect debug and error messages to a specified file. By default, output is redirected to /dev/ null. I often use an empty string ("") to prevent any redirection, as we described previously. Here are some additional troubleshooting tips: • Start by running on only one machine to make sure that works • Manually ssh to all of the workers from the master machine • Set the master option to a value that all workers can resolve, possibly using a dot- separated IP address • Run your job from a directory that is available on all machines • Check if there are any firewalls that might interfere When It Works snow is a fairly high-level package, since it doesn’t focus on low-level communication operations, but on execution. It provides a useful variety of functions that support embarrassingly parallel computation. Đ Of course, just because ping can get past a firewall doesn’t mean that snow can. As you can see from the manual mode output, the master process is listening on port 10187, so you may have to configure your firewall to allow connections on that port. You could try the command telnet beard 10187 as a further test. When It Works | 35 www.it-ebooks.info
  48. And When It Doesn’t Communications difficulties: snow doesn’t provide functions for explicitly commu- nicating between the master and workers, and in fact, the workers never communicate between themselves. In order to communicate between workers, you would have to use functions in the underlying communication package. Of course, that would make your program less portable, and more complicated. A package that needed to do that would probably not use snow, but use a package like nws or Rmpi directly. The Wrap-up In this chapter, you got a crash course on the snow package, including some advanced topics such as running snow programs via a batch queueing system. snow is a powerful package, able to run on clusters with hundreds of nodes. But if you’re more interested in running on a quad-core laptop than a supercomputer, the next chapter on the multicore package will be of particular interest to you. 36 | Chapter 2: snow www.it-ebooks.info
  49. CHAPTER 3 multicore multicore is a popular parallel programming package for use on multiprocessor and multicore computers. It was written by Simon Urbanek, and first released on CRAN in 2009. It immediately became popular because its clever use of the fork() system call allows it to implement a parallel lapply() operation that is even easier to use than snow’s parLapply(). Unfortunately, because fork() is a Posix system call, multicore can’t really be used on Windows machines.* Fork() can also cause problems for functions that use resources that were allocated or initialized exclusively for the master, or parent process. This is particularly a problem with graphics functions, so it isn’t generally recommended to use multicore with an R GUI.† Nevertheless, multicore works perfectly for most R functions on Posix systems, such as Linux and Mac OS X, and its use of fork() makes it very efficient and convenient, as we’ll see in this chapter. Quick Look Motivation: You have an R script that spends an hour executing a function using lapply() on your laptop. Solution: Replace lapply() with the mclapply() function from the multicore package. Good because: It’s easy to install, easy to use, and makes use of hardware that you probably already own. * An experimental attempt was made to support Windows in multicore 0.1-4 using the Windows NT/2000 Native API, but it only partially works on Windows 2000 and XP, and not at all on Vista and Windows 7. † multicore 0.1-4 attempts to disable the event loop in forked processes on Mac OS X in order to support the Mac GUI for R. 37 www.it-ebooks.info
  50. How It Works multicore is intended to run on Posix-based multiprocessor and multicore systems. This includes almost all modern Mac OS X and Linux desktop and laptop computers. It can also be used on single nodes of a Linux cluster, for example, but it doesn’t support the use of multiple cluster nodes, like snow. Since multicore is rather efficient, it can handle somewhat finer-grained parallel prob- lems than snow, but it is still intended for coarse-grained, embarrassingly parallel applications. It cannot compete with multithreaded programming for performing fine- grained parallelism, such as vector operations, for example. Since multicore runs on a single computer, it doesn’t give you access to greater aggre- gate memory, like snow. However, since fork() only copies data when it is modified, multicore often makes more efficient use of memory on a single computer than snow can on a single computer. Setting Up multicore is available on CRAN, so it is installed like any other CRAN package. Much of it is written in C, but it doesn’t depend on any external libraries, so building it from source is fairly easy on Posix-based systems. Here’s how I usually install multicore: install.packages("multicore") It may ask you which CRAN mirror to use, and then it will download and install the package. There is no Windows binary distribution available for multicore on CRAN, so if you’re using Windows 2000 or XP, and want to try the experimental Windows support, you’ll have to build it from the source distribution. This requires additional software to be installed, and is beyond the scope of this book. Once you’ve installed multicore, you should verify that you can load it: library(multicore) If that succeeds, you are ready to start using multicore. 38 | Chapter 3: multicore www.it-ebooks.info
  51. Working with It The mclapply Function The most important and commonly used function in the multicore package is mcl apply(), which is basically a drop-in replacement for lapply(). It is one of the high- level functions in multicore, the others being pvec(), parallel(), and collect(), which we will discuss later. Although mclapply() takes some additional arguments (all prefixed with “mc.”), it is essentially the same as lapply(). If you have an R script that spends a lot of time calling lapply(), it’s very possible that all you will have to do to parallelize it is to load the multicore package and replace lapply() with mclapply(). For example, let’s write a parallel K-Means using multicore: library(multicore) library(MASS) results <- mclapply(rep(25, 4), function(nstart) kmeans(Boston, 4, nstart=nstart)) i <- sapply(results, function(result) result$tot.withinss) result <- results[[which.min(i)]] This is nearly identical to the sequential, lapply() version of K-Means from the snow chapter, except that we loaded the multicore package and replaced lapply() with mclapply(). In particular, we didn’t have to create a cluster object, and we didn’t have to initialize the workers by loading the MASS package on each of them. This is because mclapply() automatically starts the workers using fork(). These workers inherit the functions, variables and environment of the master process, making explicit worker initialization unnecessary. It may surprise you that mclapply() creates worker processes every time it is called. snow doesn’t do that since starting workers on a cluster is often rather time consuming. However, fork() is relatively fast, especially since it doesn’t copy process data until it needs to, a technique called copy-on-write which takes advantage of the operating sys- tem’s virtual memory system. In addition, forking the workers every time mclapply() is called gives each of them a virtual copy of the master’s environment right at the point that mclapply() is executed, so worker data is in sync with the master. Thus, you don’t need to recreate the master’s data and environment in the workers, as in snow, since fork() does that automatically and efficiently. The mc.cores Option The mclapply() function takes a number of optional arguments that modify its behav- iour. One of the most important of these is the mc.cores argument which controls the number of workers that are created, which is often set equal to the number of cores on the computer. By default, mclapply() uses the value of getOption("cores"), which can Working with It | 39 www.it-ebooks.info
  52. be set using the standard options() function. If this option isn’t set, mclapply() will detect and use the number of cores on the computer. Let’s tell mclapply() to start two workers using mc.cores: > unique(unlist(mclapply(1:100, function(i) Sys.getpid(), mc.cores = 2))) [1] 4953 4954 As you can see, there are only two unique PIDs in the results, indicating that exactly two processes executed all 100 tasks. Cores or Workers? The mc.cores argument may sound like it specifies the number of cores to use, but it actually specifies the number of workers to start. If mc.cores is set equal to the number of cores and the resulting workers are the only compute intensive processes on the machine, then they will probably each get a core to themselves, but that’s up to your operating system’s scheduler. It is possible to influence the Linux scheduler through the sched_setaffinity() system call for example, but none of the functions in multi core do that. Now let’s use options() to specify three workers: > options(cores = 3) > unique(unlist(mclapply(1:100, function(i) Sys.getpid()))) [1] 4955 4956 4957 This will also control the number of workers started by the pvec() function, which we will discuss later. The mc.set.seed Option Another important mclapply() option is mc.set.seed. When mc.set.seed is set to TRUE, mclapply() will seed each of the workers to a different value after they have been created, which is mclapply()’s default behaviour. If mc.set.seed is set to FALSE, mclapply() won’t do anything with respect to the random number generator. In general, I would recommend that you leave mc.set.seed set to TRUE unless you have a good reason to turn it off. The problem with setting mc.set.seed to FALSE is that the worker processes will inherit the state of the master’s random number generator if it is set. Let’s experiment with setting mc.set.seed to FALSE. First, we’ll generate some random numbers on the workers using mclapply() when the master’s state is clean: > mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = FALSE) [[1]] [1] -1.268046 0.262834 2.415977 40 | Chapter 3: multicore www.it-ebooks.info
  53. [[2]] [1] -0.1817228 0.6496526 -0.7741212 [[3]] [1] -0.7378100 0.1080590 -0.5902874 All the values are different, so everything looks fine. But watch what happens if we generate a random number on the master, and then call mclapply() again: > rnorm(1) [1] 1.847741 > mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = FALSE) [[1]] [1] 0.6995516 -0.2436397 -0.6131929 [[2]] [1] 0.6995516 -0.2436397 -0.6131929 [[3]] [1] 0.6995516 -0.2436397 -0.6131929 Now the workers all produce identical random numbers, and they will produce the same numbers if I were to call mclapply() again! This happens because generating any random numbers or calling set.seed() creates a variable called .Random.seed in the global environment, and its value is used to generate subsequent random numbers. Therefore, if that variable exists on the master when mclapply() is executed, all the worker processes will inherit it and produce the same stream of random numbers unless something is done to reseed each of the workers. When mc.set.seed is TRUE, mclapply() will explicitly set the seed differently in each of the workers before calling the user’s function. Let’s try that after setting the seed in the master to make sure the workers do indeed produce different random numbers: > set.seed(7777442) > mclapply(1:3, function(i) rnorm(3), mc.cores = 3, mc.set.seed = TRUE) [[1]] [1] -1.0757472 -0.7850815 -0.1700620 [[2]] [1] -0.63224810 -0.04542427 1.46662809 [[3]] [1] -0.2067085 0.7669072 0.4032044 As of multicore 0.1-5, setting mc.set.seed to TRUE will cause mclapply() to execute set.seed(Sys.getpid()) in each of the workers. Thus, not only are the workers seeded differently from each other, but they are also seeded differently from the workers cre- ated by previous calls to mclapply().‡ ‡ Of course, Unix process IDs usually only go up to about 32767, so they will wrap around eventually, but I’ll ignore that issue. Working with It | 41 www.it-ebooks.info
  54. Load Balancing with mclapply What if you want load balancing with multicore? By default, mclapply() will work like snow’s parLapply() function. That is, it preschedules the work by dividing it into as many tasks as there are cores. Sometimes that works well, even if the tasks have very different lengths. But to best balance the work performed by each of the workers, pre- scheduling can be turned off by setting mc.preschedule to FALSE. This makes mclapply() work more like snow’s clusterApplyLB() function. Let’s use the parallel sleep example to see what difference prescheduling can make: > set.seed(93564990) > sleeptime system.time(mclapply(sleeptime, Sys.sleep, mc.cores = 4)) user system elapsed 0.012 0.008 64.763 > system.time(mclapply(sleeptime, Sys.sleep, mc.cores = 4, mc.preschedule = FALSE)) user system elapsed 0.032 0.028 57.347 Unfortunately we can’t easily generate performance plots, as with snow, but the elapsed times demonstrate that it can help to turn off prescheduling if the times to execute the aggregated tasks are different. The difference isn’t as great as we demonstrated between clusterApply() and clusterApplyLB(), since prescheduling tends to smooth out the differences in the length of individual tasks, but it can still be significant. Keep in mind that a new worker is forked for every element of the vector passed to mclapply() when prescheduling is turned off. That means that the performance could suffer if each call to the function is relatively short. In other words, you should probably only set mc.preschedule to FALSE if the tasks are both long and varying in length. Oth- erwise, it’s probably a safer bet to leave prescheduling turned on. The pvec Function The pvec() function was introduced in multicore 0.1-4. It is a high-level function used to execute vector functions in parallel. Let’s use it to take the cube root of a vector: > x pvec(x, "^", 1/3) [1] 1.000000 1.259921 1.442250 1.587401 1.709976 1.817121 1.912931 2.000000 [9] 2.080084 2.154435 This is like the parVapply() function that we developed in the snow chapter. In both cases, the worker function is executed on subvectors of the input vector, rather than on each element of it, making it potentially more efficient and convenient than mcl apply() for this case. pvec() takes the same additional arguments as mclapply() (all prefixed with “mc.”)— except for mc.preschedule, which isn’t appropriate for pvec(). 42 | Chapter 3: multicore www.it-ebooks.info
  55. Many vector functions, including ^, are not compute intensive enough to make the use of pvec() worthwhile. This example runs slower on my computers than the equivalent sequential version, regardless of the vec- tor length. The parallel and collect Functions The parallel() and collect() functions are the last of the high-level functions in multicore, and are used together. The parallel() function creates a new process using fork() to evaluate an expression in parallel with the calling process. It returns a parallelJob object which is passed to the collect() function to retrieve the result of the computation. collect() can be called with either a single parallelJob object, or a list of parallelJob objects. It returns the corresponding results in a list, in the same order that the jobs were specified to collect() (but only if wait is TRUE, as we’ll see later). Normally, you would call parallel() multiple times, and then use collect() to retrieve all of the results. This can be useful if you want to execute several different functions in parallel, or start a job running in the background and then do something else before waiting for it to complete. Let’s use parallel() and collect() to execute three different functions in parallel. For demonstration purposes, I’ll define very contrived functions that each sleep for a dif- ferent period of time and then return a number that identifies them: library(multicore) fun1 f1 f2 f3 collect(list(f1, f2, f3)) $`4862` [1] 1 $`4863` [1] 2 $`4864` [1] 3 As you can see, the results are returned in the same order that they were specified to collect(). That is the basic way of using parallel() and collect(). You can think of parallel() as a submit operation, and collect() as a wait operation, similar to batch queueing commands. Working with It | 43 www.it-ebooks.info
  56. Using collect Options The collect() function has two options that give you more control over how it waits for jobs started via parallel(): wait and timeout. If wait is set to TRUE (the default value), then collect() waits for all of the specified jobs to finish, regardless of the value of timeout, and returns the results in a list in the same order that the jobs were specified to collect(). But if wait is set to FALSE, then collect() waits for up to timeout seconds for at least one of the jobs to finish or a process to exit, and returns the results in a list in arbitrary order, using a NULL to indicate that a process exited. If no jobs finish in that time, collect() returns a NULL. To check for results without waiting at all, you call collect() with wait set to FALSE, and timeout set to its default value of 0. Let’s do that several times, pausing after the first collect() to wait for some results: > f1 f2 f3 collect(list(f1, f2, f3), wait=FALSE) NULL > Sys.sleep(15) > collect(list(f1, f2, f3), wait=FALSE) [[1]] [1] 3 [[2]] [1] 2 [[3]] [1] 1 > collect(list(f1, f2, f3), wait=FALSE) [[1]] NULL [[2]] NULL [[3]] NULL > collect(list(f1, f2, f3), wait=FALSE) NULL Here’s what each of the four values returned by collect() indicate: No results are available and no processes have exited fun3(), fun2(), and fun1() have completed All three of the processes have exited All results have been returned and all processes have exited 44 | Chapter 3: multicore www.it-ebooks.info
  57. The timeout argument allows you to wait a specified number of seconds for at least one result to complete or one process to exit (assuming wait is set to TRUE). Let’s do that repeatedly in order to collect all of the results: > f1 f2 f3 collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] [1] 3 > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] NULL > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] [1] 2 > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] NULL > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] [1] 1 > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) [[1]] NULL > collect(list(f1, f2, f3), wait=FALSE, timeout=1000000) NULL Here’s what each of the seven values returned by collect() indicate: fun3() has completed The process that executed fun3() has exited fun2() has completed the process that executed fun2() has exited fun1() has completed The process that executed fun1() has exited All results have been returned and all processes exited Note that if we had used a shorter timeout, such a 2, collect() would have returned some NULLs, indicating that the timeout had expired before any jobs completed or pro- cesses exited. Working with It | 45 www.it-ebooks.info