`
hugh.wangp
  • 浏览: 288784 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Hadoop 中使用DistributedCache遇到的问题

阅读更多

 

自己在写MAR/REDUCE代码时,遇到了一个问题,一个大数据文件和一个小数据文件匹配计算,但是小数据文件太小,所以想采用HIVE的MAP JOIN的方式,把小数据文件放到直接大数据文件map的datanode的内存中,这样少了MR代码的1对N的数据文件关联。

实现这个的最佳方案就是利用distributed cache。HIVE的MAP JOIN也是利用这个技术。

首先简要介绍一下distributed cache是如何使用的,然后总结下自己在使用distributed cache遇到的问题,这些问题网上也有人遇到,但是没有给出明确的解释。希望能够帮助同样遇到此类问题的朋友。

distributed cache至少有如下的两类类应用:
1.MAP、REDUCE本身和之间共享的较大数据量的数据
2.布置第三方JAR包,可以避免集群的删减导致部分依赖的机器的JAR包的丢失

distributed cache使用的流程总结如下:
1.在HDFS上准备好要共享的数据(text、archive、jar)
2.在distributed cache中添加文件
3.在mapper或者reducer类中获取数据
4.在map或者reduce函数中使用数据


1.数据本来就在HDFS上,所以省去了流程中的第一步
可以使用hadoop fs -copyFromLocal把本地文件cp到HDFS上
2.在distributed cache中添加文件
public static void disCache(String dimDir, JobConf conf) throws IOException {
           FileSystem fs = FileSystem.get(URI.create(dimDir), conf);
           FileStatus[] fileDir = fs.listStatus(new Path(dimDir));
           for (FileStatus file : fileDir) {
                DistributedCache.addCacheFile(URI.create(file.getPath().toString()), conf);
           }
}
因为我利用的数据是HIVE脚本生成的,所以无法指定具体的文件路径,采用这种方式把一张HIVE表的所有数据都加载到cache中。如果能直接明确知道文件名称就简单很多了,例如:
DistributedCache.addCacheFile(URI.create(“/mytestfile/file.txt”), conf);
3.在mapper或者reducer中获取数据
public void configure(JobConf job) {
          try {
               urlFile = DistributedCache.getLocalCacheFiles(job);
               seoUrlFile = "file://" + urlFile[0].toString();
               allUrlFile = "file://" + urlFile[1].toString();
               getData(seoUrlFile, seoUrlDim);
               getData(allUrlFile, allUrlDim);
          } catch (IOException e) {
                     // TODO Auto-generated catch block
               e.printStackTrace();
           }
}
      我在流程2中添加了两个文件,通过DistributedCache.getLocalCacheFiles(job)获取Path[],针对不同文件调用getData,把文件数据放在不同的List中seoUrlDim,allUrlDim
     如此一来,distributed cache的过程就结束了,接下来就在map()或者reduce()中使用这些数据就OK了。
4.在map或者reduce函数中使用数据
     一定要区分流程3和4,3是获取数据,4是使用数据。我就是在前期没弄明白这个的差异,导致内存溢出。


虽说好像挺简单的,但是在实现这个代码的过程中有如下几个问题困扰了我好久,网上也没找到很好的解决方案,后来在兄弟的帮助下搞定
1.FileNotFoundException
2.java.lang.OutOfMemoryError: GC overhead limit exceeded
1.FileNotFoundException
     这个问题涉及到DistributedCache.getLocalCacheFiles(job) 这个函数,此函数返回的Path[]是执行map或者reduce的datanode的本地文件系统中路径,但是我在getData中利用的SequenceFile.Reader的默认filesystem是hdfs,这就导致获取数据时是从hdfs上找文件,但是这个文件是在本地文件系统中,所以就会报出这个错误FileNotFoundException。
     例如,DistributedCache.getLocalCacheFiles(job)返回的PATH路径是:/home/dwapp/hugh.wangp/mytestfile/file.txt,在默认文件系统是hdfs时,获取数据时会读hdfs://hdpnn:9000/home/dwapp/hugh.wangp/mytestfile/file.txt,但是我们是在本地文件系统,为了避免数据获取函数选择"错误"的文件系统,我们在/home/dwapp/hugh.wangp/mytestfile/file.txt前加上"file://",这样就会从本地文件系统中读取数据了。就像我例子中的seoUrlFile = "file://" + urlFile[0].toString();

2.java.lang.OutOfMemoryError: GC overhead limit exceeded
     这个问题是我在没搞明白如何使用distributed cache时犯下的错误,也就是我没弄明白流程3和4的区别。我把流程3中说的获取数据的过程放在map函数中,而在map函数中其实是使用数据的过程。这个错误因为使每用一个map就要获取一下数据,也就是初始化一个list容器,使一个datanode上起N个map,就要获取N个list容器,内容溢出也就是自然而然的事情了。
     我们一定要把获取数据的过程放在mapper或reducer类的configure()函数中,这样对应一个datanode就只有一份数据,N个map可以共享着一份数据。


内容单薄,望志同道合之士互相学习。
分享到:
评论
14 楼 bottle1 2014-07-29  
我也遇到FileNotFoundException这个问题,发现通过DistributedCache.getLocalCacheFiles(job);获取的是本地的路径,如:file:\usr\hadoop\tmp\mapred\local\1406614959849\user.csv,
我是在eclipse里运行的,请问如何解决这个问题呢
13 楼 yongqi 2013-08-25  
hi hugh.wangp:
   请教您一个问题,我现在也在被java.io.FileNotFoundException的问题困扰着,昨天弄到凌晨4点,依然不知所然。。。
   我现在运行一个关于DistributedCache的程序,程序是hadoop in action中的示例,程序本身还是比较简单的,遇到的问题是,如果在HDFS和Linux文件系统中,相同路径下同时存在我需要缓存的文件,则程序可以正常运行,但两个文件系统中缺一不可,否则就会报FileNotFoundException的异常。。而且linux系统下的路径必须的HDFS中的路径相同。但我做过测试,程序确实读取的是Linux中的那个文件。
    我试了你上边说的加上“file://”,也是不行的。

    请问您是否遇到过这样的问题,估计是哪里的原因?
12 楼 leibnitz 2012-08-15  
leibnitz 写道
我说的是一个mapper而不是一个map()调用;
而且好像是当这个job done之后就不用可以再用这个cache file的吧?


后经证实,如果在同台机器已经有了dist cache file,不会再次download的
11 楼 leibnitz 2012-08-13  
其实我看漏了一点,就是使用cache之前在将其发布到Hdfs上,所以刚开始时我以为会将hdfs上的文件同时删除.其实是不会的,只删除local cached files


想问另一个问题:你们在elcipse开发后,怎样直接提交mapred到集群上?我用eclipse plugin 的run on hadoop没效,说找不到map.class?
或者可以加一个q以便联系:15541307
10 楼 hugh.wangp 2012-08-13  
是的,job done后是不能用这个cache file,如果另一个job还是需要这个file,就是要再次做cop to local。
一个mapper()是需要调用一次distributed cache。如果很多map在一台机器上,distributed cache就没有太多优势,但很难保证很多map在一台机器上。distributed cache效果就很明显了。
9 楼 leibnitz 2012-08-13  
我说的是一个mapper而不是一个map()调用;
而且好像是当这个job done之后就不用可以再用这个cache file的吧?
8 楼 hugh.wangp 2012-08-13  
leibnitz 写道
如果有很多maps和reduces,或者map与reduce在同一台机器上,这样不就导致太多的copy to local过程吗?
这种情况相比直接放在hdfs上有多大的好处?

这种copy to local不是1个map就调用1次,而是一次性的。
7 楼 leibnitz 2012-08-13  
如果有很多maps和reduces,或者map与reduce在同一台机器上,这样不就导致太多的copy to local过程吗?
这种情况相比直接放在hdfs上有多大的好处?
6 楼 hugh.wangp 2012-07-19  
DistributedCache.addCacheFile()装载的文件是在hdfs上的吗?是不是添加文件这步就没有成功。如果可以的话,代码发给我(hugh.wangp@gmail.com)看看,没看到代码,有些细节可能考虑不到。
5 楼 xueyinv86 2012-07-19  
没想到hugh这么快就有回复了,激动啊。
可是我是在DistributedCache.getLocalCacheFiles(job); 的时候没有get到文件路径,哪怕是hsdf的路径都没有,调试的时候是个null值呀
4 楼 hugh.wangp 2012-07-19  
路径是本地路径,不是hdfs路径,所以你拼路径的时候必须加前缀"file://"说明是本地路径。hadoop默认访问的路径是hdfs
3 楼 xueyinv86 2012-07-19  
urlFile = DistributedCache.getLocalCacheFiles(job);  是null的..不能获取到路径,为什么啊
2 楼 hugh.wangp 2012-06-10  
可以直接在获取URL的前后System.out.println()打印些标签定位错误
1 楼 neyshule 2012-06-10  
java.io.FileNotFoundException: File /juker/cm does not exist.
你好,我也出现了相同的错误,请问怎么打印才能找出这个错呢?

相关推荐

Global site tag (gtag.js) - Google Analytics