- 浏览: 288488 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
onlyamoment:
请问为什么要限制不合并文件呢?事实上,用动态分区写表时候容易出 ...
HIVE动态分区参数配置 -
alexss1988:
请问楼主,RCFILE由于列式存储方式,数据加载时性能消耗较大 ...
HIVE文件存储格式的测试比较 -
空谷悠悠:
jersey文档中提到:Client instances ar ...
自整理手册Jersey Client API -
bottle1:
我也遇到FileNotFoundException这个问题,发 ...
Hadoop 中使用DistributedCache遇到的问题 -
yongqi:
hi hugh.wangp: 请教您一个问题,我现在也在被 ...
Hadoop 中使用DistributedCache遇到的问题
by hugh.wangp
我们的数据绝大多数都是在HIVE上,对HIVE的SEQUENCEFILE和RCFILE的存储格式都有利用,为了满足HIVE的数据开放,hive client的方式就比较单一,直接访问HIVE生成的HDFS数据也是一种必要途径,所以本文整理测试了如何编写基于TEXTFILE、SEQUENCEFILE、RCFILE的数据的map reduce的代码。以wordcount的逻辑展示3种MR的代码。
其实只要知道MAP的输入格式是什么,就知道如何在MAP中处理数据;只要知道REDUCE(也可能只有MAP)的输出格式,就知道如何把处理结果转成输出格式。
表1:
如下代码片段是运行一个MR的最简单的配置:定义job、配置job、运行job
//map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 JobConf conf = new JobConf(WordCountRC.class); //设置一个用户定义的job名称 conf.setJobName("WordCountRC"); //为job的输出数据设置Key类 conf.setOutputKeyClass(Text.class); //为job输出设置value类 conf.setOutputValueClass(IntWritable.class); //为job设置Mapper类 conf.setMapperClass(MapClass.class); //为job设置Combiner类 conf.setCombinerClass(Reduce.class); //为job设置Reduce类 conf.setReducerClass(Reduce.class); //为map-reduce任务设置InputFormat实现类 conf.setInputFormat(RCFileInputFormat.class); //为map-reduce任务设置OutputFormat实现类 conf.setOutputFormat(TextOutputFormat.class); //为map-reduce job设置路径数组作为输入列表 FileInputFormat.setInputPaths(conf, new Path(args[0])); //为map-reduce job设置路径数组作为输出列表 FileOutputFormat.setOutputPath(conf, new Path(args[1])); //运行一个job JobClient.runJob(conf);
而此刻,我们更多的是关注配置InputFormat和OutputFormat的setInputFormat和setOutputFormat。根据我们不同的输入输出做相应的配置,可以选择表1的任何格式。
当我们确定了输入输出格式,接下来就是来在实现map和reduce函数时首选对输入格式做相应的处理,然后处理具体的业务逻辑,最后把处理后的数据转成既定的输出格式。
如下是处理textfile、sequencefile、rcfile输入文件的wordcount代码,大家可以比较一下具体区别,应该就能处理更多其它输入文件或者输出文件格式的数据。
代码1:textfile版wordcount
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class WordCountTxt{ public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCountTxt.class); conf.setJobName("wordcounttxt"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
代码2:sequencefile版wordcount
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCountSeq { public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Text key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // TODO Auto-generated method stub JobConf conf = new JobConf(WordCountSeq.class); conf.setJobName("wordcountseq"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(SequenceFileAsTextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
代码3:rcfile版wordcount
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCountRC { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, BytesRefArrayWritable, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word =new Text(); @Override public void map(LongWritable key, BytesRefArrayWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { Text txt = new Text(); txt.set(value.get(0).getData(), value.get(0).getStart(), value.get(0).getLength()); String[] result = txt.toString().split("\\s"); for(int i=0; i < result.length; i++){ word.set(result[i]); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterator<IntWritable> value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (value.hasNext()) { sum += value.next().get(); } result.set(sum); output.collect(key, result); } } /** * @param args */ public static void main(String[] args) throws IOException{ JobConf conf = new JobConf(WordCountRC.class); conf.setJobName("WordCountRC"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(RCFileInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
原始数据:
hadoop fs -text /group/alidw-dev/seq_input/attempt_201201101606_2339628_m_000000_0 12/02/13 17:07:57 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/02/13 17:07:57 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor 12/02/13 17:07:57 INFO compress.CodecPool: Got brand-new decompressor hello, i am ok. are you? i am fine too!
编译打包完成后执行:
hadoop jarWordCountSeq.jar WordCountSeq /group/alidw-dev/seq_input/ /group/alidw-dev/rc_output
执行完毕就能看到最终结果:
hadoop fs -cat /group/alidw-dev/seq_output/part-00000 am 2 are 1 fine 1 hello, 1 i 2 ok. 1 too! 1 you? 1
发表评论
-
【转】Hadoop 中的两表join
2012-08-09 10:35 1584原文见:http://www.gemini5201314 ... -
HIVE动态分区参数配置
2012-07-30 15:33 12550设置如下参数开启动态分区: hive.exec ... -
配置HIVE执行的本地模式
2012-07-21 09:20 4238自0.7版本后Hive开始支持任务执行选择本地模式(l ... -
HIVE表数据量和数据记录数的矛与盾
2012-07-06 09:45 12116HIVE作为在Hadoop ... -
HIVE如何使用自定义函数
2012-06-28 19:44 2666HIVE提供了很多函数 ... -
[陷阱]HIVE外部分区表一定要增加分区
2012-06-27 16:43 13077刚开始玩HIVE外部表可能会遇到的小陷阱。 只要 ... -
HIVE元数据
2012-06-20 12:52 13304HIVE元数据表数据字典: 表名 ... -
LINUX下单机安装HADOOP+HIVE手册
2012-05-31 15:59 2165HADOOP篇 HADOOP安装 1.tar - ... -
小文件合并
2012-05-03 13:07 3117文件数目过多,增加namenode的压力,hdfs的 ... -
HIVE UDF/UDAF/UDTF的Map Reduce代码框架模板
2012-04-01 10:09 5835自己写代码时候的利用到的模板 UDF步骤: ... -
HIVE文件存储格式的测试比较
2012-02-13 17:26 3630by hugh.wangp 根据自身涉及到的数据分布和 ...
相关推荐
下面解释了涉及的每个文件: 注意:PlayByPlay Map-Reduce 作业是由 Jesse Anderson ( ) 编写的代码的分支PLAYBYPLAYDRIVER.java:运行 Map Reduce 作业的 Java 文件。 调用 PLAYBYPLAYMAPPPER.java 和 ...
大数据框架(HADOOP、HIVE、HBASE)优化和简历项目编写(视频+讲义+笔记),内容包括但不限于: 01_回顾复习HADOOP阶段课程讲解【案例项目】 02_MapReduceShuffle回顾及性能优化详解 03_MapReduce 二次排序回顾及...
3.4 自定义文件格式 3.4.1 输入输出格式 技术点18 输入和输出格式为CSV 的文件 3.4.2 output committing 的重要性 3.5 本章小结 第3 部分 大数据模式 4 处理大数据的MapReduce 模式 4.1 Join ...
技术点16 使用Thrift3.3.5 Avro技术点17 MapReduce 的下一代数据序列化技术3.4 自定义文件格式3.4.1 输入输出格式技术点18 输入和输出格式为CSV 的文件3.4.2 output committing 的重要性 3.5 本章小...
map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据块 namenode和...
map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 ...
5.2.1 Reduce侧的联结 5.2.2 基于DistributedCache的复制联结 5.2.3 半联结:map侧过滤后在reduce侧联结 5.3 创建一个Bloom filter 5.3.1 Bloom filter做了什么 5.3.2 实现一个Bloom filter 5.3.3 Hadoop 0.20...
map侧过滤后在reduce侧联结5.3 创建一个Bloom filter5.3.1 Bloom filter做了什么5.3.2 实现一个Bloom filter5.3.3 Hadoop 0.20 以上版本的Bloom filter5.4 温故知新5.5 小结5.6 更多资源第6 章 编程实践6.1 开发...
895.2.1 Reduce侧的联结 905.2.2 基于DistributedCache的复制联结 985.2.3 半联结:map侧过滤后在reduce侧联结 1015.3 创建一个Bloom filter 1025.3.1 Bloom filter做了什么 1025.3.2 实现一个Bloom filter 1045.3.3...
865.1.3 预处理和后处理阶段的链接 865.2 联结不同来源的数据 895.2.1 Reduce侧的联结 905.2.2 基于DistributedCache的复制联结 985.2.3 半联结:map侧过滤后在reduce侧联结 1015.3 创建一个Bloom filter...
HDFS YARN MapReduce Map阶段并⾏处理数据 Reduce阶段对Map处理数据的结构进⾏汇总 ⼤数据体系 名词解释 序 序 号 号 名称 名称 描述 描述 1 Sqoop Sqoop是⼀款开源的⼯具,主要⽤于在Hadoop、Hive与传统的数据库...