WordCount 案例,wordcount案例
WordCount 案例,wordcount案例
1需求 1: 统计一堆文件中单词出现的个数
0 需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
1 数据准备:hello.txt
hadoop spring java
java spark
hadoop java spring
hive zookeeper solr
spark strom
hadoop spring java
java spark
hadoop java spring
hive zookeeper solr
spark strom
2 分析
按照 mapreduce 编程规范,分别编写 Mapper, Reducer, Driver。
3 编写程序
1) 编写 mapper 类
package com.da.mr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 输入的key LongWritable 行号
* 输入的value Text 一行内容
* 输出的key Text 单词
* 输出的value IntWritable 单词的个数
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text k = new Text();
private IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 一行内容转换成string
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 循环写出到下一个阶段
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
2) 编写 reducer 类
package com.da.mr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 1 统计单词总个数
int sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出单词总个数
v.set(sum);
context.write(key, v);
}
}
3) 编写驱动类
package com.da.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
public static void main(String[] args) throws Exception {
// 1 获取job信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 获取jar包位置
job.setJarByClass(WordcountDriver.class);
// 3 关联自定义的mappper和reducer
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置map输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置数据输入和输出文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交代码
// job.submit();
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
4 集群上测试
1) 将程序打成 jar 包,然后拷贝到 hadoop 集群中。
2) 启动 hadoop 集群
3) 执行 wordcount 程序
[joker@hadoop102 hadoop-2.7.2]# ls
bin data etc include input lib libexec LICENSE.txt logs NOTICE.txt output README.txt sbin share wait.sh wcinput wc.jar wcoutput
[joker@hadoop102 hadoop-2.7.2]# hadoop fs -put wcinput/hello.txt /user/joker/input/
[joker@hadoop102 hadoop-2.7.2]# hadoop jar wc.jar com.da.mr.WordcountDriver /user/joker/input /user/joker/output
[joker@hadoop102 hadoop-2.7.2]# hadoop fs -cat /user/joker/output/part-r-00000
hadoop 4
hive 2
java 6
solr 2
spark 4
spring 4
strom 2
zookeeper 2
5 本地测试
1)在 windows 环境上配置 HADOOP_HOME 环境变量。
2)在 eclipse 上运行程序,设置好输入参数运行main
3) 注意:如果 eclipse 打印不出日志, 在控制台上只显示
1.log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
2.log4j:WARN Please initialize the log4j system properly.
3.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
需要在项目的 src 目录下,新建一个文件,命名为“log4j.properties”, 在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
2 需求 2:把单词按照 ASCII 码奇偶分区(Partitioner)
0 分析
1 自定义分区
package com.da.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordCountPartitioner extends Partitioner<Text, IntWritable>{
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
// 1 获取单词 key
String firWord = key.toString().substring(0, 1);
char[] charArray = firWord.toCharArray();
int result = charArray[0];
// int result = key.toString().charAt(0);
// 2 根据奇数偶数分区
if (result % 2 == 0) {
return 0;
}else {
return 1;
}
}
}
2 在驱动中配置加载分区,设置 reducetask 个数
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);
3需求 3: 对每一个 maptask 的输出局部汇总(Combiner)
0 需求: 统计过程中对每一个 maptask 的输出进行局部汇总,以减小网络传输量即采用Combiner 功能。
方案一
1 增加一个 WordcountCombiner 类继承 Reducer
package com.da.mr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 1 汇总操作
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
// 2 写出
context.write(key, new IntWritable(count));
}
}
2 在 WordcountDriver 驱动类中指定 combiner
// 9 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordcountCombiner.class);
方案二
1 将 WordcountReducer 作为 combiner 在 WordcountDriver 驱动类中指定
// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordcountReducer.class);
4 需求 4:大量小文件的切片优化(CombineTextInputFormat)
0 需求: 将输入的大量小文件合并成一个切片统一处理。
1 输入数据:准备 5 个小文件
2 实现过程
1) 不做任何处理,运行需求 1 中的 wordcount 程序,观察切片个数为 5
2) 在 WordcountDriver 中增加如下代码, 运行程序,并观察运行的切片个数为 1
// 如果不设置 InputFormat, 它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
相关文章
- 暂无相关文章
用户点评