欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

hadoop m/r job模板,hadoopjob,hadoop-0.20.

来源: javaer 分享于  点击 29190 次 点评:185

hadoop m/r job模板,hadoopjob,hadoop-0.20.


hadoop-0.20.2 map/reduce job模板

[Java]代码

package hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class JobTemplate extends Configured implements Tool {    public static class M extends            Mapper<LongWritable, Text, LongWritable, Text> {        @Override        protected void map(LongWritable key, Text values, Context context)                throws IOException, InterruptedException {        }    }    public static class R extends            Reducer<LongWritable, Text, LongWritable, Text> {        @Override        protected void reduce(LongWritable key, Iterable<Text> values,                Context context) throws IOException, InterruptedException {        }    }    public static class P extends Partitioner<Text, LongWritable> {        @Override        public int getPartition(Text key, LongWritable value, int parts) {            int hash = key.toString().hashCode();            return (hash & Integer.MAX_VALUE) % parts;        }    }    public static class G implements RawComparator<Text> {        public int compare(Text o1, Text o2) {            return 0;        }        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {            return 0;        }    }    public static class C implements RawComparator<Text> {        public int compare(Text o1, Text o2) {            return 0;        }        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {            return 0;        }    }    public int run(String[] args) throws Exception {        Job job = new Job(getConf(), "Template this is!");        job.setJarByClass(JobTemplate.class);        job.setMapperClass(M.class);        // job.setCombinerClass(R.class);        job.setReducerClass(R.class);        // job.setPartitionerClass(P.class);        // job.setGroupingComparatorClass(G.class);        // job.setSortComparatorClass(C.class);        FileInputFormat.addInputPaths(job, args[0]);        // job.setInputFormatClass(LzoTextInputFormat.class);        // LzoTextInputFormat.addInputPaths(job, args[0]);        job.setOutputKeyClass(LongWritable.class);        job.setOutputValueClass(Text.class);        FileOutputFormat.setOutputPath(job, new Path(args[1]));        // job.setOutputFormatClass(TextOutputFormat.class);        // TextOutputFormat.setOutputPath(job, new Path(args[1]));        // TextOutputFormat.setCompressOutput(job, true);        // TextOutputFormat.setOutputCompressorClass(job, GzipCodec.class);        // job.setOutputFormatClass(SequenceFileOutputFormat.class);        // SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));        // SequenceFileOutputFormat.setCompressOutput(job, true);        // SequenceFileOutputFormat.setOutputCompressorClass(job,        // GzipCodec.class);        // SequenceFileOutputFormat.setOutputCompressionType(job,        // CompressionType.BLOCK);        boolean successful = job.waitForCompletion(true);        System.out.println(job.getJobID()                + (successful ? " :successful" : " :failed"));        return successful ? 0 : 1;    }    /**     * @param args     * @throws Exception     */    public static void main(String[] args) throws Exception {        System.out.println("Hello World!");        System.exit(ToolRunner.run(new Configuration(), new JobTemplate(), args));    }}

[Java]代码

package hadoop;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;public class NoSplitInputFormat extends TextInputFormat {    @Override    protected boolean isSplitable(JobContext context, Path file) {        return false;    }}

[Java]代码

package hadoop;import java.util.Arrays;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableUtils;public abstract class TextComparator implements RawComparator<Text> {    @Override    public abstract int compare(Text o1, Text o2);    @Override    public final int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,            int l2) {        int n1 = WritableUtils.decodeVIntSize(b1[s1]);        int n2 = WritableUtils.decodeVIntSize(b2[s2]);        byte[] _b1 = Arrays.copyOfRange(b1, s1 + n1, s1 + l1);        byte[] _b2 = Arrays.copyOfRange(b2, s2 + n2, s2 + l2);        String t1 = new String(_b1);        String t2 = new String(_b2);        return compare(new Text(t1), new Text(t2));    }}
相关栏目:

用户点评