Skip to content
Prev 3843 / 21312 Next

[Bioc-devel] BiocParallel

Here are two alternative implementations of pvec. pvec2 is just a simple 
rewrite of pvec to use mclapply. pvec3 then extends pvec2 to accept a 
specified chunk size or a specified number of chunks. If the number of 
chunks exceeds the number of cores, then multiple chunks will get run 
sequentially on each core. pvec3 also exposes the "mc.prescheule" 
argument of mclapply, since that is relevant when there are more chunks 
than cores. Lastly, I provide a "pvectorize" function which can be 
called on a regular vectorized function to make it into a pvec'd version 
of itself. Usage is like: sqrt.parallel <- pvectorize(sqrt); 
sqrt.parallel(1:1000).

pvec2 <- function(v, FUN, ..., mc.set.seed = TRUE, mc.silent = FALSE,
                   mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE)
{
   env <- parent.frame()
   cores <- as.integer(mc.cores)
   if(cores < 1L) stop("'mc.cores' must be >= 1")
   if(cores == 1L) return(FUN(v, ...))

   if(mc.set.seed) mc.reset.stream()

   n <- length(v)
   si <- splitIndices(n, cores)
   res <- do.call(c,
                  mclapply(si, function(i) FUN(v[i], ...),
                           mc.set.seed=mc.set.seed,
                           mc.silent=mc.silent,
                           mc.cores=mc.cores,
                           mc.cleanup=mc.cleanup))
   if (length(res) != n)
     warning("some results may be missing, folded or caused an error")
   res
}

pvec3 <- function(v, FUN, ..., mc.set.seed = TRUE, mc.silent = FALSE,
                   mc.cores = getOption("mc.cores", 2L), mc.cleanup = TRUE,
                   mc.preschedule=FALSE, num.chunks, chunk.size)
{
   env <- parent.frame()
   cores <- as.integer(mc.cores)
   if(cores < 1L) stop("'mc.cores' must be >= 1")
   if(cores == 1L) return(FUN(v, ...))

   if(mc.set.seed) mc.reset.stream()

   n <- length(v)
   if (missing(num.chunks)) {
     if (missing(chunk.size)) {
       num.chunks <- cores
     } else {
       num.chunks <- ceiling(n/chunk.size)
     }
   }
   si <- splitIndices(n, num.chunks)
   res <- do.call(c,
                  mclapply(si, function(i) FUN(v[i], ...),
                           mc.set.seed=mc.set.seed,
                           mc.silent=mc.silent,
                           mc.cores=mc.cores,
                           mc.cleanup=mc.cleanup,
                           mc.preschedule=mc.preschedule))
   if (length(res) != n)
     warning("some results may be missing, folded or caused an error")
   res
}

pvectorize <- function(FUN) {
   function(...) pvec3(FUN=FUN, ...)
}
On Wed 14 Nov 2012 02:23:30 PM PST, Michael Lawrence wrote: