Hello,
I'm trying to use snow and parApply to parallelize a litte script of
mine that computes fisher
tests but I'm puzzled by the timing results. Maybe I should use a matrix
and other functions like parRapply/parCapply?
On a fairly big data set (413795 tests to be done) the simple version
took 449m33.496s,
while the parallel one (with 4 nodes of a socket cluster) 486m24.006s.
While running it begins with 4 processes and after a short time only
one remains active. This
process occupies a bunch of memory so I'm guessing that it's the one
taking care of the "reduce"
step that puts together the results, but I would have hoped in better
performances. [On
a smaller subset of data the parallel results were better than the serial ones].
I'm trying now with mpi but it seems to be aiming at the same results.
Maybe I'm doing something wrong, chosen the wrong data structure or
something like that,
as long as I'm not an experienced R programmer at all, so
I'm pasting here the relevant part of the source code:
get_fisher <- function(counts){
mat <- matrix(as.numeric(counts[c("a","b", "c", "d")]), ncol=2)
colnames(mat) <- c('1', '2')
rownames(mat) <- c('f', 'g')
f <- fisher.test(as.table(mat), alt="two.sided")
return(c(counts["id"], f$p.value))
}
if (!is.null(opt$parallel)) {
library("snow")
library("Rmpi")
cl <- makeMPIcluster(opt$parallel)
#cl <- makeSOCKcluster(rep("localhost", opt$parallel))
}
counts <- read.table("stdin", sep="\t")
colnames(counts) <- c("id", "exon1","gene1", "exon2", "gene2")
if (!is.null(opt$parallel)) {
fishers <- parApply(cl, counts, 1, get_fisher)
stopCluster(cl)
} else {
fishers <- apply(counts, 1, get_fisher)
}
df <- as.data.frame(fishers)
write.table(df, file="", sep="\t", quote=FALSE, col=F, append=T, row.names=F)
Thank you for your help,
Elena Grassi
Snow, parApply computational times
4 messages · Elena Grassi, Martin Morgan
Hi Elena --
On 09/28/2012 04:02 AM, Elena Grassi wrote:
Hello,
I'm trying to use snow and parApply to parallelize a litte script of
mine that computes fisher
tests but I'm puzzled by the timing results. Maybe I should use a matrix
and other functions like parRapply/parCapply?
On a fairly big data set (413795 tests to be done) the simple version
took 449m33.496s,
while the parallel one (with 4 nodes of a socket cluster) 486m24.006s.
While running it begins with 4 processes and after a short time only
one remains active. This
process occupies a bunch of memory so I'm guessing that it's the one
taking care of the "reduce"
step that puts together the results, but I would have hoped in better
performances. [On
a smaller subset of data the parallel results were better than the serial ones].
I'm trying now with mpi but it seems to be aiming at the same results.
Maybe I'm doing something wrong, chosen the wrong data structure or
something like that,
as long as I'm not an experienced R programmer at all, so
I'm pasting here the relevant part of the source code:
get_fisher <- function(counts){
mat <- matrix(as.numeric(counts[c("a","b", "c", "d")]), ncol=2)
colnames(mat) <- c('1', '2')
rownames(mat) <- c('f', 'g')
f <- fisher.test(as.table(mat), alt="two.sided")
return(c(counts["id"], f$p.value))
}
if (!is.null(opt$parallel)) {
library("snow")
library("Rmpi")
cl <- makeMPIcluster(opt$parallel)
#cl <- makeSOCKcluster(rep("localhost", opt$parallel))
}
counts <- read.table("stdin", sep="\t")
colnames(counts) <- c("id", "exon1","gene1", "exon2", "gene2")
if (!is.null(opt$parallel)) {
fishers <- parApply(cl, counts, 1, get_fisher)
the problem is that the amount of 'work' for each calculation is small,
so that any gain from parallel calculation is offset by the cost of
sending data back and forth, etc. Try to group the work into a list of
tasks, and do each element on the task list on a separate processor.
Here is some data
opt <- list(parallel = 8)
df <- data.frame(a=3, b=1, c=1, d=3, id=seq_len(1000))
we use the 'parallel' package, which comes with base R. On a non-windows
machine we'd rather use mclapply without explicitly making a cluster.
library(parallel)
cl <- makeCluster(opt$parallel, "SOCK")
here's the function we want to apply to each row; I changed the return
value so that it's a numeric vector rather than a list
get_fisher <- function(counts){
mat <- matrix(as.numeric(counts[c("a","b", "c", "d")]), ncol=2)
colnames(mat) <- c('1', '2')
rownames(mat) <- c('f', 'g')
f <- fisher.test(as.table(mat), alt="two.sided")
return(c(counts[["id"]], f$p.value))
}
here we divide the work in to a list of tasks
idx <- splitIndices(nrow(df), opt$parallel)
worth taking a look at idx -- str(idx) -- a list of length opt$parallel,
with each element containing a vector of indices representing the rows
that a cluster node will operate on. Now send the elements of idx to the
processor nodes, one vector of indices to each processor. On the
processors, do the work using apply.
result0 <- parLapply(cl, idx, function(i, df, FUN) {
apply(df[i,,drop=FALSE], 1, FUN)
}, df, get_fisher)
We need to knit our tasks back together, which in this case can be done with
result <- do.call(rbind, lapply(result0, t))
Hope this helps,
Martin
stopCluster(cl)
} else {
fishers <- apply(counts, 1, get_fisher)
}
df <- as.data.frame(fishers)
write.table(df, file="", sep="\t", quote=FALSE, col=F, append=T, row.names=F)
Thank you for your help,
Elena Grassi
Computational Biology / Fred Hutchinson Cancer Research Center 1100 Fairview Ave. N. PO Box 19024 Seattle, WA 98109 Location: Arnold Building M1 B861 Phone: (206) 667-2793
On Fri, Sep 28, 2012 at 3:22 PM, Martin Morgan <mtmorgan at fhcrc.org> wrote:
the problem is that the amount of 'work' for each calculation is small, so that any gain from parallel calculation is offset by the cost of sending data back and forth, etc.
Yup, that's a sensible explanation, even if having only a single process alive for the majority of the computational time was an hint of something slightly different from my point of view, maybe wrong :) Thank you very much for your suggestion, I will give it a try right away (as soon as our computational resources are free, at least :) ). E.
Ok, I could not resist, and bang!, big improvement: 18m16.856s With only 4 nodes, that's a huge improvement (even better than I expected). Thank you very much, E.