John Norstad

Northwestern University

Academic and Research Technologies

December 8, 2009

Revised November 3, 2011

**IMPORTANT NOTICE:** I no longer do any work at all with MapReduce or Hadoop. I'm leaving this note on my web site in case people find it interesting or useful. I cannot, however, provide any support and will not answer any questions. If you send me email about this stuff, I will not answer it.

This algorithm was developed as an exercise while the author was learning MapReduce.

We present the algorithm in English, as pseudo-code, and as Java source code for Hadoop.

We assume that the reader is familiar with both matrix algebra and MapReduce.

We actually present a family of four related algorithms, which we call "strategies". The strategies have different performance characteristics and tradeoffs.

Suppose:

Matrix A has dimension IxK with elements a(i,k) for 0 <= i < I and 0 <= k < KMatrix B has dimension KxJ with elements b(k,j) for 0 <= k < K and 0 <= j < J

Then:

Matrix C = A*B has dimension IxJ with elements c(i,j) defined as:c(i,j) = sum over 0 <= k < K of a(i,k)*b(k,j)

We split A and B into blocks (sub-matrices) small enough so that a pair of blocks can be multiplied in memory on a single node in the cluster.

Let:

IB = Number of rows per A block and C block.We use the following notation for the blocks. For all:

KB = Number of columns per A block and rows per B block.

JB = Number of columns per B block and C block.NIB = number of A row and C row partitions = (I-1)/IB+1

NKB = number of A column and B row partitions = (K-1)/KB+1

NJB = number of B column and C column partitions = (J-1)/JB+1

0 <= ib < NIBDefine:

0 <= kb < NKB

0 <= jb < NJB

A[ib,kb] = The block of A consisting ofrows IB*ib through min(IB*(ib+1),I)-1B[kb,jb] = The block of B consisting of

columns KB*kb through min(KB*(kb+1),K)-1rows KB*kb through min(KB*(kb+1),K)-1C[ib,jb] = The block of C consisting of

columns JB*jb through min(JB*(jb+1),J)-1rows IB*ib through min(IB*(ib+1),I)-1C[ib,kb,jb] = A[ib,kb] * B[kb,jb]

columns JB*jb through min(JB*(jb+1),J)-1

With this notation, we have:

C[ib,jb] = sum over 0 <= kb < KB of A[ib,kb]*B[kb,jb]= sum over 0 <= kb < KB of C[ib,kb,jb](Trivial proof omitted.)

Note that:

A blocks have dimension IBxKB.

B blocks have dimension KBxJB.

C blocks have dimension IBxJB.Except for the "remainder" blocks at the bottom and right edges of A, B and C, which might have smaller dimension.

A has NIB*NKB blocks.

B has NKB*NJB blocks.

C has NIB*NJB blocks.The matrix cell A(i,j) is the block cell A[i/IB, j/JB](i mod IB, j mod JB), and similarly for B and C.

The block cell A[ib,jb](i,j) is the matrix cell A(ib*IB+i, jb*JB+j), and similarly for B and C.

Our first instinct is to use two MapReduce jobs. The first job does the block multiplications and the second job sums up the results.

Job 1 does the heavy lifting. The reducers do the block multiplications. The mappers are responsible for distributing the block data to the reducers, with the help of a carefully chosen intermediate key structure and key comparator and partitioning functions.

How should we assign the block multiplication tasks to the reducers? There are several possible strategies. We'll present them in order from the simplest to the more sophisticated.

The simplest strategy is to have each reducer do just one of the block multiplications. Reducer R[ib,kb,jb] is responsible for multiplying A[ib,kb] times B[kb,jb] to produce C[ib,kb,jb]. We have a maximum of NIB*NKB*NJB reducers multiplying blocks in parallel.

Reducers can serve double duty, so the actual number of reduce tasks R may be less than NIB*NKB*NJB, but we'll never have more reduce tasks. In other words, for different triples (ib,kb,jb) and (ib',kb',jb'), we might have R[ib,kb,jb] = R[ib',kb',jb'].

The mappers must route a copy of each A[ib,kb] block to all of the R[ib,kb,jb] reducers, for each 0 <= jb < NJB. This is NJB copies of A and a total of NJB*I*K key/value pairs, for the worst case where A is dense with no zero elements. Similarly, the mappers must route a copy of each B[kb,jb] block to all the R[ib,kb,jb] reducers, for each 0 <= ib < NIB. This is NIB copies of B for a worst-case total of NIB*K*J key/value pairs. Thus with our first strategy, we must transfer a worst-case grand total of K*(NJB*I + NIB*J) key/value pairs over the network during the sort & shuffle phase of job 1.

This strategy uses lots of reducers, which makes good use of parallelization, but it also requires a large amount of network traffic. We might prefer alternatives which use less network traffic, possibly at the expense of fewer reducers with a lower level of parallelization.

In this strategy we use a single reducer to multiply a single A block times a whole row of B blocks. That is, for each A block A[ib,kb] we use a single reducer R[ib,kb] that is responsible for multiplying the A block times all the B blocks B[kb,jb] for 0 <= jb < NJB. This involves a maximum of NIB*NKB reducers. With this stategy, the data for an A block A[ib,kb] only has to be routed to the single reducer R[ib,kb], and the data for a B block B[kb,jb] has to be routed to the NIB reducers R[ib,kb] for 0 <= ib < NIB.

The worst-case number of intermediate key/value pairs transferred over the network is I*K + NIB*K*J = K*(I+NIB*J). This is a considerable improvement over strategy 1 in terms of network traffic, at the cost of fewer reduers each of which has to do more work, resulting in a lower level of parallelization.

This is s mirror image of strategy 2. We use a single reducer to multiply a single B block times a whole column of A blocks. That is, for each B block B[kb,jb] we use a single reducer R[kb,jb] that is responsible for multiplying all the A blocks A[ib,kb] times the B block B[kb,jb] for 0 <= ib < NIB. This involves a maximum of NKB*NJB reducers. With this strategy, the data for a B block B[kb,jb] only has to be routed to the single reducer R[kb,jb], and the data for an A block A[ib,kb] has to be routed to the NJB reducers R[kb,jb] for 0 <= jb < NJB.

The worst-case number of intermediate key/value pairs transferred over the network is K*J + NJB*I*K = K*(J+NJB*I). Again, in terms of network traffic, this is a considerable improvement over strategy 1. It is an improvement over strategy 2 if and only if J+NJB*I < I+NIB*J.

This strategy was inspired by correspondence with Ted Dunning.

In the first three strategies presented above, each reducer emits one or more C[ib,kb,jb] blocks, and we have to use a second MapReduce job to sum up over kb to produce the final C[ib,jb] blocks.

In our fourth strategy we use a single reducer R[ib,jb] to compute the final C[ib,jb] block, and there's no need for a second MapReduce job. The reducer receives from the mappers all the A[ib,kb] and B[kb,jb] blocks for 0 <= kb < NKB, interleaved in the following order:

A[ib,0] B[0,jb] A[ib,1] B[1,jb] ... A[ib,NKB-1] B[NKB-1, jb]

The reducer multiplies the A and B block pairs and adds up the results. That is, it computes and emits the sum over 0 <= kb < KB of A[ib,kb]*B[kb,jb].

The maximum number of reducers with this strategy is NIB*NJB.

The mappers must route a copy of each A[ib,kb] block to all of the R[ib,jb] reducers, for each 0 <= jb < NJB. This is NJB copies of A and a total of NJB*I*K key/value pairs, for the worst case where A is dense with no zero elements. Similarly, the mappers must route a copy of each B[kb,jb] block to all the R[ib,jb] reducers, for each 0 <= ib < NIB. This is NIB copies of B for a worst-case total of NIB*K*J key/value pairs. Thus, as in our first strategy, we must transfer a worst-case grand total of K*(NJB*I + NIB*J) key/value pairs over the network during the sort & shuffle phase of job 1.

Let M = the maximum total number of matrix elements that we can comfortably store in memory and process in a single node of the cluster.

Our primary constraint is that we must be able to do a single block multiplication in memory. With strategies 1-3 an A block and a B block must both fit at the same time, but we can emit the C block elements as we compute them, so we do not need to allocate a matrix in memory for any C blocks. With strategy 4 we do need to allocate a matrix in memory to compute a C block in order to do the summing, but we do not need to allocate a B block in memory. (See the pseudo-code below for details.) So this constraint can be expressed as:

IB*KB + KB*JB <= M (strategies 1-3)IB*KB + IB*JB <= M (strategy 4)

This constraint says that there is an upper bound on the sizes of our blocks.

With strategy 1, in the worst case with dense matrices, we have:

Job 1 has K*(I+J) input pairs.

Job 1 has K*(NJB*I + NIB*J) intermediate pairs.

Job 1 has K*I*J output pairs.

Job 2 has K*I*J input pairs.

Job 2 has K*I*J intermediate pairs.

Job 2 has I*J output pairs.

With strategy 2 in the worst case we have:

Job 1 has K*(I+J) input pairs.

Job 1 has K*(I + NIB*J) intermediate pairs.

Job 1 has NIB*I*J output pairs.

Job 2 has NIB*I*J input pairs.

Job 2 has NIB*I*J intermediate pairs.

Job 2 has I*J output pairs.

With strategy 3 in the worst case we have:

Job 1 has K*(I+J) input pairs.

Job 1 has K*(J + NJB*I) intermediate pairs.

Job 1 has NJB*I*J output pairs.

Job 2 has NJB*I*J input pairs.

Job 2 has NJB*I*J intermediate pairs.

Job 2 has I*J output pairs.

With strategy 4 in the worst case we have:

Job 1 has K*(I+J) input pairs.

Job 1 has K*(NJB*I + NIB*J) intermediate pairs.

Job 1 has I*J output pairs.

There is no job 2.

All else being equal, we'd like to minimize the amount of network traffic during the sort & shuffle phases, and the size of the job 1 output/job 2 input files. To this end, with strategy 2 we'd like to minimize NIB (maximize IB), and with strategy 3 we'd like to minimize NJB (maximize JB). This concern weighs heavily against strategy 1, but if we do choose it we'd like to minimize both NIB and NJB (maximize both IB and JB). This concern makes strategy 4 attractive because it completely eliminates job 2, and its sort & shuffle phase and network traffic, and the intermediate job 1 output/job 2 input files. With strategy 4, as with strategy 1, we'd like to minimize both NIB and NJB (maximize both IB and JB).

In all four strategies, the desire to minimize intermediate traffic argues for smaller values of NIB and/or NJB (larger values of IB and/or JB). In general, to minimize intermediate traffic, we want larger blocks.

On the other hand, we'd like to exploit parallelism as much as possible. In particular, if we have an optimal number R of reducer task slots available in the cluster, we'd like to use all of them:

NIB*NJB*NKB >= R (strategy 1)

NIB*NKB >= R (strategy 2)

NJB*NKB >= R (strategy 3)

NIB*NJB >= R (strategy 4)

In all four strategies, the desire to exploit parallelism argues for larger values of NIB, NJB, and/or NKB (smaller values of IB, JB, and/or KB). In general, to exploit parallelism, we want smaller blocks.

Thus the basic tradeoff is between the degree of parallelization and the amount of intermediate data that must be transferred over the network and stored on disks.

The best choice of block sizes and strategy should take all of these concerns into consideration and will ultimately depend on the characteristics of the cluster and the input data and are best determined by experimentation.

We can imagine further variations on these themes which offer more options that might be useful for fine-tuning the kinds of tradeoffs we've been discussing.

For example, we might refine strategy 2 so that we use a single reducer to multiply each A block times a section of a row of B blocks. This would increase the number of reducers, which might better exploit parallelization, but at the expense of increasing network traffic.

A broader generalization would be to have each reducer responsible for rectangular sections of blocks. This would impose a second layer of "metablocks" on top of the blocks we have already defined.

It would be useful if the algorithm incorporated an option to make a best guess at an optimal strategy and block sizes.

We do not pursue these ideas further in this note.

We assume that the input files for A and B are streams of (key,value) pairs in sparse matrix format, where each key is a pair of indices (i,j) and each value is the corresponding matrix element value. The output files for matrix C=A*B are in the same format.

We have the following input parameters:

The path of the input file or directory for matrix A.

The path of the input file or directory for matrix B.

The path of the directory for the output files for matrix C.

strategy = 1, 2, 3 or 4.

R = the number of reducers.

I = the number of rows in A and C.

K = the number of columns in A and rows in B.

J = the number of columns in B and C.

IB = the number of rows per A block and C block.

KB = the number of columns per A block and rows per B block.

JB = the number of columns per B block and C block.

In the pseudo-code for the individual strategies below, we have intentionally avoided factoring common code for the purposes of clarity.

Note that in all the strategies the memory footprint of both the mappers and the reducers is flat at scale.

Note that the strategies all work reasonably well with both dense and sparse matrices. For sparse matrices we do not emit zero elements. That said, the simple pseudo-code for multiplying the individual blocks shown here is certainly not optimal for sparse matrices. As a learning exercise, our focus here is on mastering the MapReduce complexities, not on optimizing the sequential matrix multipliation algorithm for the individual blocks.

setup () var NIB = (I-1)/IB+1 var NKB = (K-1)/KB+1 var NJB = (J-1)/JB+1

map (key, value) if from matrix A with key=(i,k) and value=a(i,k) for 0 <= jb < NJB emit (i/IB, k/KB, jb, 0), (i mod IB, k mod KB, a(i,k)) if from matrix B with key=(k,j) and value=b(k,j) for 0 <= ib < NIB emit (ib, k/KB, j/JB, 1), (k mod KB, j mod JB, b(k,j))

Intermediate keys (ib, kb, jb, m) sort in increasing order first by ib, then by kb, then by jb, then by m. Note that m = 0 for A data and m = 1 for B data.

The partitioner maps intermediate key (ib, kb, jb, m) to a reducer r as follows:

```
r = ((ib*JB + jb)*KB + kb) mod R
```

These definitions for the sorting order and partitioner guarantee that each reducer R[ib,kb,jb] receives the data it needs for blocks A[ib,kb] and B[kb,jb], with the data for the A block immediately preceding the data for the B block.

var A = new matrix of dimension IBxKB var B = new matrix of dimension KBxJB var sib = -1 var skb = -1 reduce (key, valueList) if key is (ib, kb, jb, 0) // Save the A block. sib = ib skb = kb Zero matrix A for each value = (i, k, v) in valueList A(i,k) = v if key is (ib, kb, jb, 1) if ib != sib or kb != skb return // A[ib,kb] must be zero! // Build the B block. Zero matrix B for each value = (k, j, v) in valueList B(k,j) = v // Multiply the blocks and emit the result. ibase = ib*IB jbase = jb*JB for 0 <= i < row dimension of A for 0 <= j < column dimension of B sum = 0 for 0 <= k < column dimension of A = row dimension of B sum += A(i,k)*B(k,j) if sum != 0 emit (ibase+i, jbase+j), sum

Note that the result of the block multiplication is emitted as it is computed. There is no need to allocate memory for a matrix C to hold this product.

Note that for "remainder" blocks we may use only part of the allocated space for A and B.

setup () var NIB = (I-1)/IB+1 var NKB = (K-1)/KB+1 var NJB = (J-1)/JB+1

map (key, value) if from matrix A with key=(i,k) and value=a(i,k) emit (i/IB, k/KB, -1), (i mod IB, k mod KB, a(i,k)) if from matrix B with key=(k,j) and value=b(k,j) for 0 <= ib < NIB emit (ib, k/KB, j/JB), (k mod KB, j mod JB, b(k,j))

Intermediate keys (ib, kb, jb) sort in increasing order first by ib, then by kb, then by jb. Note that jb < 0 for A data and jb >= 0 for B data.

The partitioner maps intermediate key (ib, kb, jb) to a reducer r as follows:

```
r = (ib*KB + kb) mod R
```

These definitions for the sorting order and partitioner guarantee that reducer R[ib,kb] receives the data it needs for block A[ib,kb] and blocks B[kb,jb], with the data for the A block immediately preceding the data for the B blocks.

var A = new matrix of dimension IBxKB var B = new matrix of dimension KBxJB var sib = -1 var skb = -1 reduce (key, valueList) if key is (ib, kb, -1) // Save the A block. sib = ib skb = kb Zero matrix A for each value = (i, k, v) in valueList A(i,k) = v if key is (ib, kb, jb) with jb >= 0 if ib != sib or kb != skb return // A[ib,kb] must be zero! // Build the B block. Zero matrix B for each value = (k, j, v) in valueList B(k,j) = v // Multiply the blocks and emit the result. ibase = ib*IB jbase = jb*JB for 0 <= i < row dimension of A for 0 <= j < column dimension of B sum = 0 for 0 <= k < column dimension of A = row dimension of B sum += A(i,k)*B(k,j) if sum != 0 emit (ibase+i, jbase+j), sum

Note that the results of the block multiplications are emitted as they are computed. There is no need to allocate memory for a matrix C to hold the products.

Note that for "remainder" blocks we may use only part of the allocated space for A and B.

setup () var NIB = (I-1)/IB+1 var NKB = (K-1)/KB+1 var NJB = (J-1)/JB+1

map (key, value) if from matrix A with key=(i,k) and value=a(i,k) for 0 <= jb < NJB emit (k/KB, jb, i/IB), (i mod IB, k mod KB, a(k,j)) if from matrix B with key=(k,j) and value=b(k,j) emit (k/KB, j/JB, -1), (k mod KB, j mod KB, b(k,j))

Intermediate keys (kb, jb, ib) sort in increasing order first by kb, then by jb, then by ib. Note that ib >= 0 for A data and ib < 0 for B data.

The partitioner maps intermediate key (kb, jb, ib) to a reducer r as follows:

```
r = (jb*KB + kb) mod R
```

These definitions for the sorting order and partitioner guarantee that reducer R[kb,jb] receives the data it needs for block B[kb,jb] and blocks A[ib,kb], with the data for the B block arriving immediately prededing the data for the A blocks.

var A = new matrix of dimension IBxKB var B = new matrix of dimension KBxJB var skb = -1 var sjb = -1 reduce (key, valueList) if key is (kb, jb, -1) // Save the B block. skb = kb sjb = jb Zero matrix B for each value = (k, j, v) in valueList B(k,j) = v if key is (kb, jb, ib) with ib >= 0 if kb != skb or jb != sjb return // B[kb,jb] must be zero! // Build the A block. Zero matrix A for each value = (i, k, v) in valueList A(i,k) = v // Multiply the blocks and emit the result. ibase = ib*IB jbase = jb*JB for 0 <= i < row dimension of A for 0 <= j < column dimension of B sum = 0 for 0 <= k < column dimension of A = row dimension of B sum += A(i,k)*B(k,j) if sum != 0 emit (ibase+i, jbase+j), sum

Note that the results of the block multiplications are emitted as they are computed. There is no need to allocate memory for a matrix C to hold the products.

Note that for "remainder" blocks we may use only part of the allocated space for A and B.

setup () var NIB = (I-1)/IB+1 var NKB = (K-1)/KB+1 var NJB = (J-1)/JB+1

map (key, value) if from matrix A with key=(i,k) and value=a(i,k) for 0 <= jb < NJB emit (i/IB, jb, k/KB, 0), (i mod IB, k mod KB, a(i,k)) if from matrix B with key=(k,j) and value=b(k,j) for 0 <= ib < NIB emit (ib, j/JB, k/KB, 1), (k mod KB, j mod JB, b(k,j))

Intermediate keys (ib, jb, kb, m) sort in increasing order first by ib, then by jb, then by kb, then by m. Note that m = 0 for A data and m = 1 for B data.

The partitioner maps intermediate key (ib, jb, kb, m) to a reducer r as follows:

```
r = (ib*JB + jb) mod R
```

These definitions for the sorting order and partitioner guarantee that each reducer R[ib,jb] receives the data it needs for blocks A[ib,kb] and B[kb,jb], with the blocks arriving in interleaved order A[ib,0] B[0,jb] ... A[ib,NKB-1] B[NKB-1,jb].

var A = new matrix of dimension IBxKB var C = new matrix of dimension IBxJB var sib = -1 var sjb = -1 var skb = -1 reduce (key, valueList) // key is (ib, jb, kb, m) if ib != sib or jb != sjb // Start a new (ib,jb) sequence. if sib != -1 // Emit the C block for the previous sequence. ibase = sib*IB jbase = sjb*JB for 0 <= i < row dimension of C for 0 <= j < column dimension of C v = C(i,j) if v != 0 emit (ibase+i, jbase+j), v sib = ib sjb = jb skb = -1 Zero matrix C if key is (ib, jb, kb, 0) // Save the A block. skb = kb Zero matrix A for each value = (i, k, v) in valueList A(i,k) = v if key is (ib, jb, kb, 1) if kb != skb return // A[ib,kb] must be zero! // Multiply the A and B blocks and add them into C. for each value = (k, j, v) in valueList for 0 <= i < row dimension of A C(i,j) += A(i,k)*v

At the end of the reducer task we must emit the last C block.

cleanup () if sib != -1 // Emit the last C block. ibase = sib*IB jbase = sjb*JB for 0 <= i < row dimension of C for 0 <= j < column dimension of C v = C(i,j) if v != 0 emit (ibase+i, jbase+j), v

Note that the results of the block multiplications must be summed before being emitted, so we need to allocate memory for a matrix C that is used to do the summation. We do not, however, need to allocate memory for a matrix B.

Note that for "remainder" blocks we may use only part of the allocated space for A and C.

Job 2 is the same for strategies 1-3.

Identity mapper

Sum reducer

Combiner = reducer

There is no job 2 for strategy 4.

Note that we use a combiner in job 2. How much good might it do in terms of reducing the network traffic during the sort & shuffle phase? Consider strategy 1 as an example (strategies 2 and 3 have a similar analysis). Each job 1 reducer R[ib,kb,jb] emits key/value pairs of the form ((i,j), v). A quick analysis of the algorithm reveals that within this output all the (i,j) pairs are unique. But reducer tasks serve double duty if R < NIB*NKB*NJB, and in this case it is possible that the output file generated by a reducer task can contain more than one key/value pair for the same key pair (i,j).

An identity mapper task in job 2 reads from an input split that is all or part of a single block of a job 1 reducer output file. Thus a combiner for such a mapper will find something to combine only in the case where the reducer was serving double duty. Even in this case, because of the way the reducers emit their output, the combiner may not have much if anything to do.

Thus using the combiner in job 2 may be more trouble than it's worth, and it might even hurt performance. This is an open question that deserves further research and experimentation.

The Java code uses the new MapReduce API introduced in Hadoop 0.20.

The code has been tested with Hadoop running in standalone mode. It has not been tested on an actual Hadoop cluster.