您的位置 首页 java

MapReduce编程实例:单词计数

文章来源:加米谷大数据

本文介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。

任务准备

单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含大量的单词,则无法使用传统的线性程序进行处理,而这类问题正是 MapReduce 可以发挥优势的地方。
在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词计数的基本思路和具体执行过程。下面将介绍如何编写具体实现代码及如何运行程序。
首先,在本地创建 3 个文件:file00l、file002 和 file003,文件具体内容如表 1 所示。

再使用 HDFS 命令创建一个 input 文件目录。

 hadoop fs -mkdir input  

然后,把 file001、file002 和 file003 上传到 HDFS 中的 input 目录下。

 hadoop fs -put file001 input
hadoop fs -put file002 input
hadoop fs -put file003 input  

编写 MapReduce 程序的第一个任务就是编写 Map 程序。在单词计数任务中,Map 需要完成的任务就是把输入的文本数据按单词进行拆分,然后以特定的键值对的形式进行输出。

编写 Map 程序

Hadoop MapReduce 框架已经在类 Mapper 中实现了 Map 任务的基本功能。为了实现 Map 任务,开发者只需要继承类 Mapper,并实现该类的 Map 函数。
为实现单词计数的 Map 任务,首先为类 Mapper 设定好输入类型和输出类型。这里,Map 函数的输入是 <key,value> 形式,其中,key 是输入文件中一行的行号,value 是该行号对应的一行内容。
所以,Map 函数的输入类型为 <LongWritable,Text>。Map 函数的功能为完成文本分割工作,Map 函数的输出也是 <key,value> 形式,其中,key 是单词,value 为该单词出现的次数。所以,Map 函数的输出类型为 <Text,LongWritable>。
以下是单词计数程序的 Map 任务的实现代码。

 public static class CoreMapper extends Mapper<Object,Text,Text,IntWritable> {
    private static final IntWritable one = new IntWritable(1);
    private static Text label = new Text();
    public void map(Object key,Text value,Mapper<Object,Text,Text,IntWritable> Context context)throws IOException,InterruptedException {
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while(tokenizer.hasMoreTokens()) {
            label.set(tokenizer.nextToken());
            context.write(label,one);
        }
    }
}  

在上述代码中,实现 Map 任务的类为 CoreMapper。该类首先将需要输出的两个变量 one 和 label 进行初始化。

  • 变量 one 的初始值直接设置为 1,表示某个单词在文本中出现过。
  • Map 函数的前两个参数是函数的输入参数,value 为 Text 类型,是指每次读入文本的一行,key 为 Object 类型,是指输入的行数据在文本中的行号。

StringTokenizer 类机器方法将 value 变量中文本的一行文字进行拆分,拆分后的单词放在 tokenizer 列表中。然后程序通过循环对每一个单词进行处理,把单词放在 label 中,把 one 作为单词计数。
在函数的整个执行过程中,one 的值一直是 1。在该实例中,key 没有被明显地使用到。context 是 Map 函数的一种输出方式,通过使用该变量,可以直接将中间结果存储在其中。
根据上述代码,Map 任务结束后,3 个文件的输出结果如表 2 所示。

MapReduce编程实例:单词计数

编写 Reduce 程序

编写 MapReduce 程序的第二个任务就是编写 Reduce 程序。在单词计数任务中,Reduce 需要完成的任务就是把输入结果中的数字序列进行求和,从而得到每个单词的出现次数。
在执行完 Map 函数之后,会进入 Shuffle 阶段,在这个阶段中,MapReduce 框架会自动将 Map 阶段的输出结果进行排序和分区,然后再分发给相应的 Reduce 任务去处理。经过 Map 端 Shuffle 阶段后的结果如表 3 所示。

MapReduce编程实例:单词计数

Reduce 端接收到各个 Map 端发来的数据后,会进行合并,即把同一个 key,也就是同一单词的键值对进行合并,形成<key, <V1, V2, .. Vn>> 形式的输出。经过 Map 端 Shuffle 阶段后的结果如表 4 所示。

Reduce 阶段需要对上述数据进行处理从而得到每个单词的出现次数。从 Reduce 函数的输入已经可以理解 Reduce 函数需要完成的工作,就是首先对输入数据 value 中的数字序列进行求 和。以下是单词计数程序的 Reduce 任务的实现代码。

 public static class CoreReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable count = new IntWritable ();
    public void reduce(Text key,Iterable<IntWritable> values,Reducer<Text,IntWritable, Text,IntWritable> Context context)throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable intWritable : values){
            sum += intWritable.get();
        }
        count.set(sum);
        context.write(key, count);
    }
}  

与 Map 任务实现相似,Reduce 任务也是继承 Hadoop 提供的类 Reducer 并实现其接口。 Reduce 函数的输入、输出类型与 Map 函数的输出类型本质上是相同的。
在 Reduce 函数的开始部分,首先设置 sum 参数用来记录每个单词的出现次数,然后遍历 value 列表,并对其中的数字进行累加,最终就可以得到每个单词总的出现次数。在输出的时候,仍然使用 context 类型的变量存储信息。当 Reduce 阶段结束时,就可以得到最终需要的结果,如表 5 所示。

编写 main 函数

为了使用 CoreMapper 和 CoreReducer 类进行真正的数据处理,还需要在 main 函数中通过 Job 类设置 Hadoop MapReduce 程序运行时的环境变量,以下是具体代码。

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.printIn("Usage:wordcount <in> <out>");
        System.exit(2);
    }
    Job job = new Job (conf, "WordCount"); //设置环境参数
    job.setJarByClass (WordCount.class); //设置程序的类名
    job.setMapperClass(CoreMapper.class); //添加 Mapper 类
    job.setReducerClass(CoreReducer.class); //添加 Reducer类
    job.setOutputKeyClass (Text.class); //设置输出 key 的类型
    job.setOutputValueClass (IntWritable.class);  
    //设置输出 value 的类型
    FileInputFormat.addInputPath (job, new Path (otherArgs [0]));
    //设置输入文件路径
    FileOutputFormat.setOutputPath (job,new Path (otherArgs [1]));
    //设置输入文件路径
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}  

代码首先检查参数是不是正确,如果不正确就提醒用户。随后,通过 Job 类设置环境参数,并设置程序的类、Mapper 类和 Reducer 类。然后,设置了程序的输出类型,也就是 Reduce 函数的输出结果 <key,value> 中 key 和 value 各自的类型。最后,根据程序运行时的参数,设置输入、输出文件路径。

核心代码包

编写 MapReduce 程序需要引用 Hadoop 的以下几个核心组件包,它们实现了 Hadoop MapReduce 框架。

 import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;  

这些核心组件包的基本功能描述如表 6 所示。

运行代码

在运行代码前,需要先把当前工作目录设置为 /user/local/Hadoop。编译 WordCount 程序需要以下 3 个 Jar,为了简便起见,把这 3 个 Jar 添加到 CLASSPATH 中。

 $export
CLASSPATH=/usr/local/hadoop/share/hadoop/common/hadoop-common-2.7.3.jar:$CLASSPATH
$export
CLASSPATH=/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-2.7.3.jar:$CLAS SPATH
$export
CLASSPATH=/usr/local/hadoop/share/hadoop/common/lib/common-cli-1.2.jar:$CLASSPATH  

使用 JDK 包中的工具对代码进行编译。

 $ javac WordCount.java  

编译之后,在文件目录下可以发现有 3 个“.class”文件,这是 Java 的可执行文件,将它们打包并命名为 wordcount.jar。

 $ jar -cvf wordcount.jar *.class  

这样就得到了单词计数程序的 Jar 包。在运行程序之前,需要启动 Hadoop 系统,包括启动 HDFS 和 MapReduce。然后,就可以运行程序了。

 $ ./bin/Hadoop jar wordcount.jar WordCount input output  

最后,可以运行下面的命令查看结果。

 $ ./bin/Hadoop fs -cat output/*  

文章来源:智云一二三科技

文章标题:MapReduce编程实例:单词计数

文章地址:https://www.zhihuclub.com/176226.shtml

关于作者: 智云科技

热门文章

网站地图