《MapReduce实现大矩阵乘法(附源码及详细过程》由会员分享,可在线阅读,更多相关《MapReduce实现大矩阵乘法(附源码及详细过程(14页珍藏版)》请在金锄头文库上搜索。
1、报告题目: MapReduce实现大矩阵乘法 1、题目要求题目介绍:请使用MapReduce编程模型实现矩阵相乘。要求如下:1.按照MapReduce框架,对该问题进行剖析,并给出解决思路,要求图文并茂。2.展示运行代码并要求注释清晰。3.展示算法运行性能(不需要打印最终的矩阵输出结果图),要求分别给出在103,104,105维度的矩阵乘法运行时间,mapworker和reduceworker的实时运行数量等。2、系统环境操作系统环境:Linux Centos6.5Hadoop环境:hadoop 2.6.5 完全分布式系统(1个名称节点,1个数据节点)3、解决方案3.1 算法分析假设有两个矩阵
2、A和矩阵B,如下图所示:矩阵乘法的数学公式如下:下图是两个矩阵A和B:A,B在文件中的存储可以是每一行元素占文件的一行,如下图:也可以是列表的形式,如下图:考虑到大矩阵的稀疏且数据量大,HDFS中的文件大小有限制,无法保证一个矩阵只保存在一个文件中,因此采用列表的方式来存储矩阵,其格式为。在Map阶段,Mapper按行读取矩阵A,B所在文件的数据,并将其转化为某种格式的键值对保存到上下文。在Shffle阶段,Hadoop自动将相同key的value合并成的格式,再将该格式的数据传递给reduce。在Reduce阶段,Reducer接收Shuffle传递过来的,处理数据得到最终的结果,并将结果输
3、出到目标矩阵文件中。这里的问题关键在于,如何设计MapperShffleReducer之间传递的键值对数据。考虑从目标矩阵出发,将Reducer处理的键值对的中的key设置为目标矩阵的行坐标和列坐标,即”i, j”;那么Iterable(values)迭代器中的数据就应该是得到Cij需要的数据,即得到这些数据还需要标注其来自那个矩阵,另外根据公式:相乘的数据需要对应,因此,还需要标记其位置,因此有如下设计:矩阵A: = key:“目标矩阵的行序, 目标矩阵的列序”, value:“源矩阵a, 目标矩阵的行序, 源矩阵a的值”如:key:”1,1”, value:”a,1,1”矩阵B: = ke
4、y:“目标矩阵的行序, 目标矩阵的列序”, value:“源矩阵b, 标矩阵的列序, 源矩阵b的值”如:key:”1,1”, value:”b,1,10”。因此,该格式也是Mapper传递给Shuffle的键值对格式, Shuffle再将相同key的元素合并,得到计算Ci,j的所有数据及计算顺序,Mapper读取键值对,从迭代器中读取出每一项value,从而计算Ci,j。Mapper读矩阵所在的文件,得到的键值对格式为:矩阵A: = key:“行号”,value:“i, j, Aij”矩阵B: = key:“行号”,value:“i, j, Bij”如果要得到Mapper的输出格式: = ke
5、y:“目标矩阵的行序, 目标矩阵的列序”, value:“源矩阵a, 目标矩阵的行序, 源矩阵a的值”只需要得到目标矩阵的列序即可输出,而列序可以通过循环得到,因为该元素需要参与目标矩阵对应行的所有目标列的计算。3.2 算法图示矩阵在文件中的存储格式如下:第一行为矩阵的形状 在mapper之前,先判断两个矩阵是否能够进行乘法运算。3.3 代码一、矩阵乘法代码如下:package main.matrix;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.F
6、ileNotFoundException;import java.io.IOException;import java.io.InputStreamReader;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataInputStream;import org.apach
7、e.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import
8、 org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;imp
9、ort org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Matrix_Multi extends Configured implements Toolpublic static class MatrixMapper extends Mapper private String flag = null;static int m = 0;/ 矩阵A的行数private int n = 0;/ 矩阵A的列数static int p = 0;/ 矩阵B的列数private int q =
10、0;/ 矩阵B的行数private int a=0, b=0;/矩阵A,B文件的行数记录器/计数器enum CounterLINESKIP, /出错的行Overrideprotected void setup(Context context) throws IOException,InterruptedException FileSplit split = (FileSplit) context.getInputSplit();flag = split.getPath().getName();/* * map阶段:读取矩阵a和b, 将矩阵中的元素按指定格式存入上下文 * 得到的键值对格式为:
11、key: value_1, value_2 . value_n */Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException tryString val = value.toString().split(,);System.out.println(map:t + flag + t + value.toString();if (test_ma.txt.equals(flag) a + ;if(a = 1)/第一行为矩阵形状ret
12、urn;/矩阵B的列数循环,表示当前元素要参与运算的次数以及运算的目标位置/写入上下文的键值对格式为 key:“目标矩阵的行序, 目标矩阵的列序”, value:源矩阵a, 目标矩阵的行序, 源矩阵a的值for (int i = 1; i = p; i+) context.write(new Text(val0 + , + i), new Text(a,+ val1 + , + val2);/写入到上下文中的形式key:行号System.out.println(key: + new Text(val0 + , + i).toString() + tvalue: + new Text(a,+ v
13、al1 + , + val2).toString(); else if (test_mb.txt.equals(flag) b + ;if(b = 1)/第一行为矩阵形状return;/矩阵A的行数循环,表示当前元素要参与运算的次数以及运算的目标位置/写入上下文的键值对格式为 key:“目标矩阵的行序, 目标矩阵的列序”, value:源矩阵b, 标矩阵的列序, 源矩阵b的值for (int i = 1; i = m; i+) context.write(new Text(i + , + val1), new Text(b,+ val0 + , + val2);System.out.println(key: + new Text(i + , + val1).toString() + tvalue: + new Text(b,+ val0 + , + val2).toString();catch(java.lang.ArrayIndexOutOfBoundsException e)/ 记录不符合输入规则的行context.getCounter(Counter.LINESKIP).increment(1);return;