Skip to content

"chunking" parallel tasks

11 messages · Mark Kimpel, Brian G. Peterson, Hodgess, Erin +4 more

#
Mark Kimpel wrote:
Google "nesting foreach loops"

The foreach package will do what you want.  Steve Weston has posted some 
examples to this list on this topic as well.

Regards,

   - Brian
#
In parallel computing, "chunking" is used to bundle multiple messages
together, since big messages can very often be sent much more efficiently
than short messages.  That means that in systems like "snow" and "foreach",
if your task executes very quickly, most of the time may be spent moving
data around, rather than doing useful work.  By bundling many tasks
together, you might be able to do the communication efficiently enough so
that you get a benefit from doing the tasks in parallel.  However, if you
have short tasks and large inputs and/or outputs, chunking won't really
help you, since it isn't going to make the communication more efficient.
You need to figure out some way to decrease the amount of data that is
being moved around.

The nws package supports "chunking" via the eachElem "chunkSize" element
of the "eo" argument.  The multicore package supports chunking as an all
or nothing thing via the "mc.preschedule" argument to mclapply.  The doMC
package uses the backend-specific "preschedule" option, which it passes on
to mclapply via the "mc.preschedule" argument.  The doMPI package uses
the backend-specific "chunkSize" option to specify any chunk size, much
like nws.

The iterators and itertools packages contain various functions that create
iterators that allow you to split up data in chunks, so they support "chunking"
in their own way.  That allows you to do manual chunking, as I call it, with
any of the foreach backends.

The snow package has some internal functions that split vectors and matrices
into chunks.  They are used in functions such as parMM, parCapply, and
parRapply.

- Steve
On Tue, Jan 26, 2010 at 9:44 AM, Brian G. Peterson <brian at braverock.com> wrote:
#
On 01/26/2010 07:48 AM, Stephen Weston wrote:
Another perhaps more common use is for 'cheap' load-balancing, where the
individual tasks may take variable amounts of time (e.g., during
simulations). Say each of the first 100 tasks take 10 times longer than
the next 900. Dividing the 1000 tasks equally between 10 nodes means
that the entire computation is limited by the first node, which takes
(100 x 10) / 1 time units, versus (900 x 1) / 9 for the remaining 9
nodes. Choose a smaller chunk size, e.g., 10. All nodes get 10 big
tasks, and when complete come back for more. Ignoring communication time
the elapsed time is something like (100 x 10) / 10 (for the 100 big jobs
each taking 10 time units, divided between 10 processors) + (900 x 1) /
10  (for the remaining 900 small jobs, each taking 1 time unit, divided
across 10 nodes).

Since not explicitly mentioned elsewhere, the Rmpi mpi.*apply functions
have a job.num argument for this purpose.

Martin

  
    
#
There are some very sophisticated strategies for chunking out there in
the parallel processing world, aimed at dealing with the load balancing
problem.  The idea is to use very large chunks at the beginning and
middle stages of the computation, to minimize communication overhead and
the like, but then switch to smaller ones near the end in order to keep
all the processes busy.  For example, the "guided" option in OpenMP does
this.

However, in my experience, it is seldom necessary to resort to this, as
load balancing is typically not a problem.  One can even show this
theoretically:  Say task times are T[1], T[2], etc., and are i.i.d.  The
standard deviation of sum(T[1:k]), divided by the mean, goes to 0 as k
goes to infinity--i.e. the sum is essentially constant.  Of course, that
is an idealized model, but again in practice I have found that load
balancing is not much of an issue.

For that reason and because of the communication overhead, in most cases
it is actually faster to statically assign 1/p of the tasks to each of
the p processes, i.e. not do chunking.

Norm Matloff
#
On 01/26/2010 09:51 AM, Norm Matloff wrote:
Agreed, and for the matrix operations Mark hinted at one likely gets the
most benefit by using R's vectorized matrix operations (rather than
*apply), then by using vectorized blas/lapack libraries, then multicore.
Probably distributing tasks across nodes in a cluster (e.g., mpi) for
matrix operations is almost always a losing proposition -- communication
costs are just too high.

Martin

  
    
#
One other approach, were the computation per chunk runs into
several(tens of ) minutes, is to monitor the running time of long
running tasks(each working on a chunk), if greater than a cut off,
split the chunk and assign to unused (or lesser loaded) machines. If a
task for a particular chunk finishes earlier than some task for a
duplicated task, invalidate the latter and kill it.
Of course, the run time for a chunk should be greater(much) than the
cost of duplicating a chunk, reading it in and starting new tasks. To
implement this, one would have write a system which actually monitors
the running time tasks, the chunking and duplication.

I think Hadoop Mapreduce does something similar, though it is most
certainly not the best tool for some tasks.

Regards
Saptarshi
On Tue, Jan 26, 2010 at 12:54 PM, Martin Morgan <mtmorgan at fhcrc.org> wrote:
#
should be 'duplicated chunk'

On Tue, Jan 26, 2010 at 1:32 PM, Saptarshi Guha
<saptarshi.guha at gmail.com> wrote: