15.3 Mapping Tasks to Processors

Being able to decompose a problem is only the first step. You'll also need to be able to map the individual tasks to different processors in your cluster. This is largely a matter of developing appropriate control structures and communication strategies. Since the ultimate goal is to reduce the time to completion, task mapping is largely a balancing act between two conflicting subgoals-the need to maximize concurrency and the need to minimize the overhead introduced with concurrency. This overhead arises primarily from interprocess communications, from process idle time, and to a lesser extent, from redundant calculations.

Consider redundant calculations first. When we separate a program into multiple tasks, the separation may not always go cleanly. Consequently, it may be necessary for each process to do redundant calculations, calculations that could have been done once by a single process. Usually, this doesn't add to the program's overall time to completion since the rest of the processes would have been idle while a single process did the calculation. In fact, having the individual processors each do the calculation may be more efficient since it eliminated the communication overhead that would be required to distribute the results of the calculation. However, this is not always the case, particularly with asymmetric processes. You should be aware of this possibility.

15.3.1 Communication Overhead

Communication overhead is a more severe problem. Returning to the matrix multiplication example, while we might obtain maximum concurrency by having a different processor for each of the 100 multiplications, the overhead of distributing the matrix elements and collecting the results would more than eliminate any savings garnered from distributing the multiplications. On the other hand, if we want to minimize communication overhead, we could package everything in one process. While this would eliminate any need for communication, it would also eliminate all concurrency. With most problems, the best solution usually (but not always) lies somewhere between maximizing concurrency and minimizing communication.

In practice, you'll need to take an iterative approach to find the right balance between these two extremes. It may take several tries to work out the details. There are three useful factors. The most important is task size. Keep in mind that tasks may be uniform, i.e., all the same size, or nonuniform. Decomposing into uniform pieces will usually minimize idle time, but this isn't always true. First, you will need to be able to distribute data efficiently so that some processes aren't waiting. Second, if some of the compute nodes are faster than others or if some are more heavily loaded, the benefit of uniformity can be lost and may even be a disadvantage.

Some tasks are inherently nonuniform. Consider searching through an array of data for an item. In one instance, you may be able to find the item very quickly. In another instance, it may take much longer. If two processes are sorting data, depending on the algorithm, the one that receives a nearly sorted set of data may have a tremendous advantage over similar processes sorting a highly random set of data.

In addition to task size, there is the issue of task generation. For some problems, task generation is clearly defined. Task generation is said to be static for these problems. For example, if we want to sort a million numbers, we can clearly determine in advance how we want to generate the tasks. But not all problems are static. Consider the problem of playing chess. The boards you will want to consider will depend on a number of factors that vary from game to game, so they aren't known in advance. Both the number and size of the task will depend on how the pieces are positioned on the board. For such problems, task generation is said to be dynamic.

A third consideration is the communication pattern that the problem will generate. Like tasks, communications may be static (the pattern is known in advance) or dynamic. In general, static communication is easier to program since dynamic communication tends to be unpredictable and error prone.

When programming, there are several very straightforward ways to minimize the impact of communications. First, try to reduce the volume of the data you send. Avoid sending unnecessary data. Can one process duplicate a calculation more efficiently than a pair of processes can exchange a value? Next, try to minimize the number of messages sent. If possible, package data so that it can be sent in a single message rather than as a series of messages. Look for hotspots in your communication pattern. When possible, overlap communications with computation to minimize network congestion. Finally, when feasible, use the collective operations in your message-passing library to optimize communication.

There are a number of other important questions that need to be answered to fully characterize communication patterns. Do all the processes need to communicate with each other or can communication be managed through a single process? Then there is the issue of communication timing, i.e., is communication synchronized? Can all the data be distributed at once, or will it be necessary to update the data as the program runs? Is communication unidirectional or bidirectional? What is the source and destination for data, i.e., does it come from another process, is it sent to another process, or is the filesystem used? There are no right or wrong answers to these questions, but you do need to know the answers to understand what's going on.

15.3.2 Load Balancing

As previously noted, idle time is a major source of overhead. The best way to minimize idle time is to balance the computing requirements among the available processors. There are several sources of idle time in parallel programs. One source is a mismatch between tasks and processors. If you try to run five processes on four processors, two of the processes will be competing for the same processor and will take twice as long as the other processes. Another source of idle time is nonuniform tasks as shown in Figure 15-4. Differences in processor speeds, memory, or workload on cluster nodes can also result in some processes taking longer than expected to complete, leaving other processes idle as they wait to send data to or receive data from those processes.

One way to minimize the overhead resulting from idle time is load balancing. Depending on the context, load balancing can mean different things. In the larger context of operating systems, load balancing may mean running different programs or processes on different machines. In the current context of parallel programming, it refers to a technique of breaking a program into tasks and distributing those tasks based on processor availability.

An example should help. Suppose you have 100 nodes in your cluster, some fast and some slow. If you divide your problem into 100 tasks and send one task to each node, then you won't finish until the slowest, most heavily loaded node finishes. If, however, you divide your problem into 1,000 tasks and write your code so that when a processor finishes one task it receives another, the faster and less loaded processors can take on a larger share of the work while the slower processors will do less. If all goes well, you will finish quicker.

This is the basic idea behind a work pool. The work is distributed by maintaining a pool of tasks that are sent to processors whenever a processor becomes idle. Typically, a master-slave arrangement is used-one (sometimes more) processor acts as a master distributing work and collecting results, while the remaining processes act as slaves that process a single task, return the results to the master, and wait for their next task. Typically, slaves are idle only toward the end of the program's execution when there are fewer uncompleted tasks than slaves.

In order to use a work pool effectively, you need to reduce the granularity of your tasks so that you have more tasks than slaves. The key issue, when reducing the granularity, is at what point communication overhead begins to outweigh the benefits of reduced idle time. In general, a work pool works best when the communication overhead is small compared to the amount of computing needed. You should also be aware that the master process can become a bottleneck if it must deal with too many tasks. This may happen if the task size is too small.

Here is the numerical integration problem rewritten using a master-slave, work pool approach.

#include "mpi.h"

#include <stdio.h>


/* problem parameters */

#define f(x)            ((x) * (x))


int main( int argc, char * argv[  ] )


   /* MPI variables */

   int dest, noProcesses, processId;

   MPI_Status status;


   /* problem variables */

   int         i, chunk, numberChunks, numberRects;

   double      area, at, height, lower, width, total, range;

   double       lowerLimit, upperLimit;


   /* MPI setup */

   MPI_Init(&argc, &argv);

   MPI_Comm_size(MPI_COMM_WORLD, &noProcesses);

   MPI_Comm_rank(MPI_COMM_WORLD, &processId);


   if (processId = = 0)         /* if rank is 0, collect parameters */


      fprintf(stderr, "Enter number of chunk to divide problem into:\n");

      scanf("%d", &numberChunks);

      fprintf(stderr, "Enter number of steps per chunk:\n");

      scanf("%d", &numberRects);

      fprintf(stderr, "Enter low end of interval:\n");

      scanf("%lf", &lowerLimit);

      fprintf(stderr, "Enter high end of interval:\n");

      scanf("%lf", &upperLimit);



   MPI_Bcast(&numberChunks, 1, MPI_INT, 0, MPI_COMM_WORLD);

   MPI_Bcast(&numberRects, 1, MPI_INT, 0, MPI_COMM_WORLD);

   MPI_Bcast(&lowerLimit, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);

   MPI_Bcast(&upperLimit, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);


   /* collect information and print results */

   /* if rank is 0, assign chunk, collect results, print results */

   if (processId = = 0)        

   {  total = 0.0;

      if (noProcesses - 1 < numberChunks) chunk = noProcesses - 1;

      else chunk = 0;

      for (i = 1; i <= numberChunks; i++)

      {  MPI_Recv(&area, 1, MPI_DOUBLE, MPI_ANY_SOURCE, MPI_ANY_TAG, 

                  MPI_COMM_WORLD, &status);

         fprintf(stderr, "Area for process %d, is: %f\n", status.MPI_TAG, 


         total = total + area; 

         if (chunk != 0 && chunk < numberChunks) chunk++;

         else chunk = 0;

         MPI_Send(&chunk, 1, MPI_INT, status.MPI_TAG, chunk, MPI_COMM_WORLD);


      fprintf (stderr, "The area from %f to %f is: %f\n",

               lowerLimit, upperLimit, total );



   /* all other processes, calculate area for chunk and send results */


      if (processId > numberChunks) chunk = 0; /* too many processes */

      else chunk = processId;

      while (chunk != 0)

      {  /* adjust problem size for subproblem */

         range = (upperLimit - lowerLimit) / numberChunks;

         width = range / numberRects;

         lower = lowerLimit + range * (chunk - 1);


         /* calculate area for this chunk */

         area = 0.0;

         for (i = 0; i < numberRects; i++)

         {  at = lower + i * width + width / 2.0;

            height = f(at);

            area = area + width * height;



         /* send results and get next chunk */

         dest = 0; 

         MPI_Send(&area, 1, MPI_DOUBLE, dest, processId, MPI_COMM_WORLD);

         MPI_Recv(&chunk, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, 





   /* finish */

   MPI_Finalize( );

   return 0;


There are two major sets of changes to this code. First, the number of regions (numberChunks) is now a parameter entered by the user. Previously, we divided the problem into the same number of regions as processors, i.e., each processor had its own well-defined region to evaluate. Now the total number of regions exceeds (or should exceed) the number of processes. The total number of regions is broadcast to each process so that the process can go ahead and begin calculating the area for its first region.

Process 0 is the master process and no longer calculates the area for a region. Rather, it keeps track of what needs to be done, assigns work, and collects results. All remaining processes are slaves and do the actual work. If one of these is heavily loaded, it may only calculate the area of one region while other, less-loaded nodes may calculate the area of several regions. Notice that a value of 0 for chunk signals a slave than no more regions need to be calculated.

