欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

WordCount 案例,wordcount案例

来源: javaer 分享于  点击 44836 次 点评:199

WordCount 案例,wordcount案例


1需求 1: 统计一堆文件中单词出现的个数
0 需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
1 数据准备:hello.txt

hadoop spring java
java spark
hadoop java spring
hive zookeeper solr
spark strom
hadoop spring java
java spark
hadoop java spring
hive zookeeper solr
spark strom

2 分析
按照 mapreduce 编程规范,分别编写 Mapper, Reducer, Driver。


3 编写程序
1) 编写 mapper 类

package com.da.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 输入的key LongWritable 行号
 * 输入的value Text 一行内容
 * 输出的key Text 单词
 * 输出的value IntWritable 单词的个数
 */
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private Text k = new Text();
    private IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 一行内容转换成string
        String line = value.toString();

        // 2 切割
        String[] words = line.split(" ");

        // 3 循环写出到下一个阶段
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }

}

2) 编写 reducer 类

package com.da.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // 1 统计单词总个数
        int sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }

        // 2 输出单词总个数
        v.set(sum);
        context.write(key, v);
    }
}

3) 编写驱动类

package com.da.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {
    public static void main(String[] args) throws Exception {

        // 1 获取job信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 获取jar包位置
        job.setJarByClass(WordcountDriver.class);

        // 3 关联自定义的mappper和reducer
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4 设置map输出数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 设置最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 设置数据输入和输出文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交代码
        // job.submit();
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

4 集群上测试
1) 将程序打成 jar 包,然后拷贝到 hadoop 集群中。
2) 启动 hadoop 集群
3) 执行 wordcount 程序

[joker@hadoop102 hadoop-2.7.2]# ls
bin  data  etc  include  input  lib  libexec  LICENSE.txt  logs  NOTICE.txt  output  README.txt  sbin  share  wait.sh  wcinput  wc.jar  wcoutput

[joker@hadoop102 hadoop-2.7.2]# hadoop fs -put wcinput/hello.txt /user/joker/input/ 

[joker@hadoop102 hadoop-2.7.2]# hadoop jar wc.jar com.da.mr.WordcountDriver /user/joker/input /user/joker/output

[joker@hadoop102 hadoop-2.7.2]# hadoop fs -cat /user/joker/output/part-r-00000
hadoop  4
hive    2
java    6
solr    2
spark   4
spring  4
strom   2
zookeeper       2

5 本地测试
1)在 windows 环境上配置 HADOOP_HOME 环境变量。
2)在 eclipse 上运行程序,设置好输入参数运行main

3) 注意:如果 eclipse 打印不出日志, 在控制台上只显示

1.log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
2.log4j:WARN Please initialize the log4j system properly.
3.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

需要在项目的 src 目录下,新建一个文件,命名为“log4j.properties”, 在文件中填入

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

2 需求 2:把单词按照 ASCII 码奇偶分区(Partitioner)
0 分析

1 自定义分区

package com.da.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordCountPartitioner extends Partitioner<Text, IntWritable>{
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        // 1 获取单词 key
        String firWord = key.toString().substring(0, 1);
        char[] charArray = firWord.toCharArray();
        int result = charArray[0];
        // int result = key.toString().charAt(0);
        // 2 根据奇数偶数分区
        if (result % 2 == 0) {
            return 0;
        }else {
            return 1;
        }
    }
}

2 在驱动中配置加载分区,设置 reducetask 个数

job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);

3需求 3: 对每一个 maptask 的输出局部汇总(Combiner)
0 需求: 统计过程中对每一个 maptask 的输出进行局部汇总,以减小网络传输量即采用Combiner 功能。

方案一
1 增加一个 WordcountCombiner 类继承 Reducer

package com.da.mr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // 1 汇总操作
        int count = 0;
        for (IntWritable v : values) {
            count += v.get();
        }
        // 2 写出
        context.write(key, new IntWritable(count));
    }
}

2 在 WordcountDriver 驱动类中指定 combiner

// 9 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordcountCombiner.class);

方案二
1 将 WordcountReducer 作为 combiner 在 WordcountDriver 驱动类中指定

// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordcountReducer.class);

4 需求 4:大量小文件的切片优化(CombineTextInputFormat)
0 需求: 将输入的大量小文件合并成一个切片统一处理。
1 输入数据:准备 5 个小文件
2 实现过程
1) 不做任何处理,运行需求 1 中的 wordcount 程序,观察切片个数为 5
2) 在 WordcountDriver 中增加如下代码, 运行程序,并观察运行的切片个数为 1

// 如果不设置 InputFormat, 它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

相关文章

    暂无相关文章
相关栏目:

用户点评