【Hadoop基础教程】Hadoop之单词计数wordcount_usage: wordcount <in> [<in>...] <out>-程序员宅基地

技术标签: hadoop  

单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版“Hello World”,该程序的完整代码可以在Hadoop安装包的src/example目录下找到。单词计数主要完成的功能:统计一系列文本文件中每个单词出现的次数,如下图所示。本blog将通过分析WordCount源码来帮助大家摸清MapReduce程序的基本结构和运行机制。

单词计数

开发环境


硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点)
软件环境:Java 1.7.0_45、hadoop-1.2.1

1、 WordCount的Map过程


Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。Map方法中的value值存储的是文本文件中的一行记录(以回车符为结束标记),而key值为该行的首字符相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成一个个的单词,并将

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
                String line = value.toString();
                String[] words = StringUtils.split(line," ");
                for(String word:words){
                    context.write(new Text(word), new LongWritable(1));
                }
        }
}


2、 WordCount的Reduce过程


Reduce过程需要继承org.apache.hadoop.mapreduce包中的Reduce类,并重写其reduce方法。Reduce方法的输入参数key为单个单词,而values是由各Mapper上对应单词的计数值所组成的列表,所以只要遍历values并求和,即可得到某个单词出现的总次数。
IntSumReducer类的实现代码如下,详细源码请参考:WordCount\src\WordCount.java。

public static class IntSumReducer 
   extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context
                       ) throws IOException, InterruptedException {
      //输入参数key为单个单词;
      //输入参数Iterable<IntWritable> values为各个Mapper上对应单词的计数值所组成的列表。
      int sum = 0;
      for (IntWritable val : values) {//遍历求和
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);//输出求和后的<key,value>
    }
}

3、 WordCount的驱动执行过程


在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程和使用IntSumReducer完成Combine和Reduce过程。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输入和输出路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。
驱动函数实现代码如下,详细源码请参考:WordCount\src\WordCount.java。

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    //设置Mapper、Combiner、Reducer方法
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    //设置了Map过程和Reduce过程的输出类型,设置key的输出类型为Text,value的输出类型为IntWritable;
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //设置任务数据的输入、输出路径;
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    //执行job任务,执行成功后退出;
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

4、 WordCount的处理过程


如上所述给出了WordCount的设计思路及源码分析过程,但很多细节都未被提及,本节将根据MapReduce的处理工程,对WordCount进行更详细的讲解。详细的执行步骤如下:
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成< key,value >对,如图所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车符所占的字符数(Windows和Linux环境下会不同)。

分割过程

2)将分割好的< key,value>对交给用户定义的map方法进行处理,生成新的< key,value >对,如图所示:

Map过程

3)得到map方法输出的< key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果,如图所示:

Map过程与Combine过程

4) Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reducer方法进行处理,得到新的< key,value>对,并作为WordCount的输出结果,如图所示:

Reduce过程

5、 WordCount的最小驱动


MapReduce框架在幕后默默地完成了很多事情,如果不重写map和reduce方法,它会不会就此罢工了?下面设计一个“WordCount最小驱动”MapReduce—LazyMapReduce,该类只对任务进行必要的初始化及输入/输出路径的设置,其余的参数(如输入/输出类型、map方法、reduce方法等)均保持默认状态。LazyMapReduce的实现代码如下:

public class LazyMapReduce {

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "LazyMapReduce");
        //设置任务数据的输入、输出路径;
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        //执行job任务,执行成功后退出;
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

可以看出在默认情况下,MapReduce原封不动地将输入

6、部署运行


1)部署源码
#设置工作环境
[hadoop@K-Master ~]$ mkdir -p /usr/hadoop/workspace/MapReduce
#部署源码
将WordCount 文件夹拷贝到/usr/hadoop/workspace/MapReduce/ 路径下;

… 你可以直接 下载 WordCount

2)编译文件

在使用javac编译命令时,我们用到了两个参数:-classpath指定编译该类所需要的核心包,-d指定编译后生成的class文件的存放路径;最后的WordCount.java表示编译的对象是当前文件夹下的WordCount.java类。

[hadoop@K-Master ~]$ cd /usr/hadoop/workspace/MapReduce/WordCount
[hadoop@K-Master WordCount]$ javac -classpath /usr/hadoop/hadoop-core-1.2.1.jar:/usr/hadoop/lib/commons-cli-1.2.jar -d bin/ src/WordCount.java
#查看编译结果
[hadoop@K-Master WordCount]$ ls bin/ -la
总用量 12
drwxrwxr-x 2 hadoop hadoop  102 9月  15 11:08 .
drwxrwxr-x 4 hadoop hadoop   69 9月  15 10:55 ..
-rw-rw-r-- 1 hadoop hadoop 1830 9月  15 11:08 WordCount.class
-rw-rw-r-- 1 hadoop hadoop 1739 9月  15 11:08 WordCount$IntSumReducer.class
-rw-rw-r-- 1 hadoop hadoop 1736 9月  15 11:08 WordCount$TokenizerMapper.class
3)打包jar文件

在使用jar命令进行打包class文件时,我们用到了两个参数:-cvf表示打包class文件并显示详细的打包信息,-C指定打包的对象;命令最后的“.”表示将打包生成的文件保存在当前目录下。

[hadoop@K-Master WordCount]$ jar -cvf WordCount.jar -C bin/ .
已添加清单
正在添加: WordCount$TokenizerMapper.class(输入 = 1736) (输出 = 754)(压缩了 56%)
正在添加: WordCount$IntSumReducer.class(输入 = 1739) (输出 = 74

特别注意:打包命令最后一个字符为“.”,表示将打包生成的文件WordCount.jar保存到当前文件夹下,输入命令时特别留心。

4)启动Hadoop集群

如果HDFS已经启动,则不需要执行以下命令,可通过jps命令查看HDFS是否已经启动

[hadoop@K-Master WordCount]$ start-dfs.sh      #启动HDFS文件系统
[hadoop@K-Master WordCount]$ start-mapred.sh       #启动MapReducer服务
[hadoop@K-Master WordCount]$ jps
5082 JobTracker
4899 SecondaryNameNode
9048 Jps
4735 NameNode
5)上传输入文件到HDFS

在MapReduce中,一个准备提交执行的应用程序称为“作业(Job)”,Master节点将对该Job划分成多个task运行于各计算节点上(Slave节点),而task任务输入输出的数据均是基于HDFS分布式文件管理系统,故需要将输入数据上传到HDFS分布式文件管理系统之上,如下所示。

#在HDFS上创建输入/输出文件夹
[hadoop@K-Master WordCount]$ hadoop fs -mkdir wordcount/input/ 
#传本地file中文件到集群的input目录下
[hadoop@K-Master WordCount]$ hadoop fs -put input/file0*.txt wordcount/input
#查看上传到HDFS输入文件夹中到文件
[hadoop@K-Master WordCount]$ hadoop fs -ls wordcount/input
Found 2 items
-rw-r--r--   1 hadoop supergroup 22 2014-07-12 19:50 /user/hadoop/wordcount/input/file01.txt
-rw-r--r--   1 hadoop supergroup 28 2014-07-12 19:50 /user/hadoop/wordcount/input/file02.txt
6)运行Jar文件

我们通过hadoop jar命令运行一个job任务,关于该命令各个参数的含义如下图所示:

hadoop-jar

[hadoop@K-Master WordCount]$ hadoop jar WordCount.jar WordCount wordcount/input wordcount/output
14/07/12 22:06:42 INFO input.FileInputFormat: Total input paths to process : 2
14/07/12 22:06:42 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/07/12 22:06:42 WARN snappy.LoadSnappy: Snappy native library not loaded
14/07/12 22:06:42 INFO mapred.JobClient: Running job: job_201407121903_0004
14/07/12 22:06:43 INFO mapred.JobClient:  map 0% reduce 0%
14/07/12 22:06:53 INFO mapred.JobClient:  map 50% reduce 0%
14/07/12 22:06:55 INFO mapred.JobClient:  map 100% reduce 0%
14/07/12 22:07:03 INFO mapred.JobClient:  map 100% reduce 33%
14/07/12 22:07:05 INFO mapred.JobClient:  map 100% reduce 100%
14/07/12 22:07:07 INFO mapred.JobClient: Job complete: job_201407121903_0004
14/07/12 22:07:07 INFO mapred.JobClient: Counters: 29
7)查看运行结果

结果文件一般由三部分组成:
1) _SUCCESS文件:表示MapReduce运行成功。
2) _logs文件夹:存放运行MapReduce的日志。
3) Part-r-00000文件:存放结果,也是默认生成的结果文件。
使用hadoop fs -ls wordcount/output命令查看输出结果目录,如下所示:

#查看FS上output目录内容
[hadoop@K-Master WordCount]$ hadoop fs -ls wordcount/output
Found 3 items
-rw-r--r--   1 hadoop supergroup  0 2014-09-15 11:11 /user/hadoop/wordcount/output/_SUCCESS
drwxr-xr-x   - hadoop supergroup  0 2014-09-15 11:10 /user/hadoop/wordcount/output/_logs
-rw-r--r--   1 hadoop supergroup 41 2014-09-15 11:11 /user/hadoop/wordcount/output/part-r-00000
使用 hadoop fs –cat wordcount/output/part-r-00000命令查看输出结果,如下所示:
#查看结果输出文件内容
[hadoop@K-Master WordCount]$ hadoop fs -cat wordcount/output/part-r-00000
Bye     1
Goodbye 1
Hadoop  2
Hello       2
World   2
到这里,整个MapReduce的快速入门就结束了。本篇blog使用一个完整的案例,从开发到部署再到查看结果,让大家对MapReduce的基本使用有所了解。
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/AnneQiQi/article/details/51189513

智能推荐

Python的__new__方法_python new-程序员宅基地

文章浏览阅读751次。Python的__new__方法Python的__new__和__init__的区别先看一段代码#-*- coding:utf8 -*-""">>> A()new init<__main__.A object at 0x02473A30>"""class A(object): def __init__(self): pri_python new

python的input和while循环_python while input-程序员宅基地

文章浏览阅读455次。python的input和while使用一、Python input()函数:获取用户输入的字符串Python3.x 中 input() 函数接受一个标准输入数据,返回为 string 类型。Python2.x 中 input() 相等于 eval(raw_input(prompt)),用来获取控制台的输入。raw_input() 将所有输入作为字符串看待,返回字符串类型。而 ..._python while input

java method getdeclaredmethod,带有类类型的Java反射getDeclaredMethod()-程序员宅基地

文章浏览阅读243次。I'm trying to understand Java reflecton and am encountering difficulties when working with non-Integer setter methods.As an example, how can I resolve the "getDeclaredMethod()" call below?import java...._aclass.getdeclaredmethod

MySQL8.0.17 安装及配置_rpm安装mysql8.0.17 如何设置端口-程序员宅基地

文章浏览阅读876次。MySQL8.0.17 安装及配置1. 下载2. MySQL配置2.1 初始化2.2 配置环境变量2.3 安装2.4 修改密码1. 下载官网下载:https://dev.mysql.com/downloads/mysql/选择适合自己的版本,下载完成后解压到自己的文件夹下。2. MySQL配置2.1 初始化解压后的目录并没有的my.ini文件,没关系可以自行创建在安装根目录下添加的my..._rpm安装mysql8.0.17 如何设置端口

Spring Security教程(9)---- 自定义AccessDeniedHandler_accessdeniedhandler是干嘛的-程序员宅基地

文章浏览阅读2.9w次,点赞10次,收藏7次。在Spring默认的AccessDeniedHandler中只有对页面请求的处理,而没有对Ajax的处理。而在项目开发是Ajax又是我们要常用的技术,所以我们可以通过自定义AccessDeniedHandler来处理Ajax请求。我们在Spring默认的AccessDeniedHandlerImpl上稍作修改就可以了。public class DefaultAccessDeniedHandle_accessdeniedhandler是干嘛的

Timeout of 60000ms expired before the position for partition could be determined踩坑flink消费kafka2.3.0_timeout of 6000ms steam 解决-程序员宅基地

文章浏览阅读7.4k次,点赞5次,收藏10次。flink消费kafka2.3.0,时报错,分区分配的不对Kafka Client Timeout of 60000ms expired before the position for partition could be determined在网上找了一波,没找到原因,后面,误打误撞,发现,是因为,kafka的配置文件,server.properties,使用了主机名作为配置,在server.properties中添加host.name=192.168.0.30 (当前所在服务器的i._timeout of 6000ms steam 解决

随便推点

渗透测试——信息收集之JSFinder的使用_如何在kali中安装jsfinder-程序员宅基地

文章浏览阅读3.1k次。文章目录前言一、使用注意事项二、使用步骤1.下载后可以放入Kali里面通过Python运行。2.运行JSFinder.py总结前言JSFinder可以通过爬取网站各个页面的JS文件从而获得其中包含的网站的子域名,非常的好用,下面介绍JSFinder的使用方法,文章后面会给出JSFinder的下载地址。一、使用注意事项下载后通过命令行去直接执行,前提是Kali安装了python3及以上的版本,否则不行。二、使用步骤1.下载后可以放入Kali里面通过Python运行。2.运行JSF.._如何在kali中安装jsfinder

CausalVAE: Disentangled Representation Learning via Neural Structural Causal Models_yang, m., liu, f., chen, z., shen, x., hao, j., wa-程序员宅基地

文章浏览阅读692次。文章目录概主要内容模型ELBO关于AAAYang M., Liu F., Chen Z., Shen X., Hao J. and Wang J. CausalVAE: disentangled representation learning via neural structural causal models. arXiv preprint arXiv:2004.086975, 2020.概隐变量的因果表示.主要内容我们通常希望隐变量zzz能够表示一些特别的特征, 通过改变zzz使得生成的_yang, m., liu, f., chen, z., shen, x., hao, j., wang, j. causalvae: disentan

curl中的坑_curl 弊端-程序员宅基地

文章浏览阅读284次。问题:用curl方法向远端服务器发请求,如果成功,远端服务器会返回数据,对方要求用application/x-www-form-urlencode的请求头传输请求参数的数据。一开始用的是:$header = array();header[]=′application/x−www−form−urlencode′;curlsetopt(header[] = &#x27;applicatio..._curl 弊端

读书笔记:关于wsgi、web框架和模板的总结(python)_webinfo.wsgi模板文件-程序员宅基地

文章浏览阅读795次。在后台,Http服务器做的工作就是获取http请求,解析请求,用html文件作为body部分做http响应。wsgi的定义很简单,就是要求web应用开发者实现一个函数来响应Http请求。wsgi对于web应用开发者,屏蔽了http请求、解析,使其可专注于html文件的动态生成等业务逻辑。常用的静态服务器软件Apache、Nginx、Lighttpd等,python内置了一个wsg服务器,作为开发用_webinfo.wsgi模板文件

PAT乙级真题 1010 一元多项式求导 C++实现_pat一元多项式求导 c++-程序员宅基地

文章浏览阅读266次。题目设计函数求一元多项式的导数。输入格式:以指数递降方式输入多项式非零项系数和指数(绝对值均为不超过 1000 的整数)。数字间以空格分隔。输出格式:以与输入相同的格式输出导数多项式非零项的系数和指数。数字间以空格分隔,但结尾不能有多余空格。注意“零多项式”的指数和系数都是 0,但是表示为 0 0。输入样例:3 4 -5 2 6 1 -2 0输出样例:12 3 -10 1 6 ..._pat一元多项式求导 c++

【SLAM】Ubuntu16.04下配置ORB-SLAM2_ubuntu16.04安装配置orb-slam2-程序员宅基地

文章浏览阅读2.4w次,点赞19次,收藏163次。本文记录了 ORB-SLAM2 在Ubuntu16.04下的安装过程._ubuntu16.04安装配置orb-slam2