博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce Application中mapper的数目和分片的数目
阅读量:6871 次
发布时间:2019-06-26

本文共 6876 字,大约阅读时间需要 22 分钟。

hot3.png

问题 MapReduce Application中mapper的数目和分片的数目是一样的

  • 默认情况下,分片和输入文件的分块数是相等的。也不完全相等,如果block size大小事128M,文件大小为128.1M,文件的block数目为2,但是application运行过程中,你会发现分片数目是1,而不是2,其中的机理,后面会分析

  • 有的程序会设置map的数目,那么map数目是怎样影响分片的数目的呢?

  • 如果文件大小为0,是否会作为一个分片传给map任务?

流程
FileInputFormat.getSplits返回文件的分片数目,这部分将介绍其运行流程,后面将粘贴其源码并给出注释

  • 通过listStatus()获取输入文件列表files,其中会遍历输入目录的子目录,并过滤掉部分文件,如文件_SUCCESS

  • 获取所有的文件大小totalSIze

  • goalSIze=totalSize/numMaps。numMaps是用户指定的map数目

  • files中取出一个文件file

  • 计算splitSize。splitSize=max(minSplitSize,min(file.blockSize,goalSize)),其中minSplitSize是允许的最小分片大小,默认为1B

  • 后面根据splitSize大小将file分片。在分片的时候,如果剩余的大小不大于splitSize*1.1,且大于0B的时候,会将该区域整个作为一个分片。这样做是为了防止一个mapper处理的数据太小

  • 将file的分片加入到splits中

  • 返回4,直到将files遍历完

  • 结束,返回splits

092534_o6ni_1775885.jpg 

源码(hadoop2.2.0)

其实流程算起来也不算复杂,所以就直接用代码注释来做吧

 

这里边涉及这么几个方法:

1、public List<InputSplit> getSplits(JobContext job), 这个由客户端调用来获得当前Job的所有分片(split),然后发送给JobTracker(新API中应该是ResourceManager),而JobTracker根据这些分片的存储位置来给TaskTracker分配map任务去处理这些分片。这个方法用到了后边的listStatus,然后根据得到的这些文件信息,从FileSystem那里去拉取这些组成这些文件的块的信息(BlockLocation),使用的是getFileBlockLocation(file,start,len),这个方法是与使用的文件系统实现相关的(FileSystem,LocalFileSystem,DistributedFileSystem)

/**    * Generate the list of files and make them into FileSplits.   * @param job the job context   * @throws IOException   */  public List
 getSplits(JobContext job) throws IOException {    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    long maxSize = getMaxSplitSize(job);    // generate splits    List
 splits = new ArrayList
();    List
 files = listStatus(job);    for (FileStatus file: files) {      Path path = file.getPath();      long length = file.getLen();      if (length != 0) {        BlockLocation[] blkLocations;        if (file instanceof LocatedFileStatus) {          blkLocations = ((LocatedFileStatus) file).getBlockLocations();        } else {          FileSystem fs = path.getFileSystem(job.getConfiguration());          blkLocations = fs.getFileBlockLocations(file, 0, length);        }        if (isSplitable(job, path)) {          long blockSize = file.getBlockSize();          long splitSize = computeSplitSize(blockSize, minSize, maxSize);          long bytesRemaining = length;          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);            splits.add(makeSplit(path, length-bytesRemaining, splitSize,                                     blkLocations[blkIndex].getHosts()));            bytesRemaining -= splitSize;          }          if (bytesRemaining != 0) {            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,                       blkLocations[blkIndex].getHosts()));          }        } else { // not splitable          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));        }      } else {         //Create empty hosts array for zero length files        splits.add(makeSplit(path, 0, length, new String[0]));      }    }    // Save the number of input files for metrics/loadgen    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());    LOG.debug("Total # of splits: " + splits.size());    return splits;  }

2、protected List<FileStatus> listStatus(JobContext job), 先根据“mapred.input.dir”的配置值去得到用户指定的所有Path。然后根据这个JobContext的Configuration得到FileSystem(当然,更可能是 DistributedFileSystem )。最后应用用户可能设置了的PathFilter,通过FileSystem获取所有这些Path所代表的File(FileStatus)。注:这个方法的东西相当多,很多内容还十分陌生。

/** List input directories.   * Subclasses may override to, e.g., select only files matching a regular   * expression.    *    * @param job the job to list input paths for   * @return array of FileStatus objects   * @throws IOException if zero items.   */  protected List
 listStatus(JobContext job                                        ) throws IOException {    List
 result = new ArrayList
();    Path[] dirs = getInputPaths(job);    if (dirs.length == 0) {      throw new IOException("No input paths specified in job");    }        // get tokens for all the required FileSystems..    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,                                         job.getConfiguration());    // Whether we need to recursive look into the directory structure    boolean recursive = getInputDirRecursive(job);        List
 errors = new ArrayList
();        // creates a MultiPathFilter with the hiddenFileFilter and the    // user provided one (if any).    List
 filters = new ArrayList
();    filters.add(hiddenFileFilter);    PathFilter jobFilter = getInputPathFilter(job);    if (jobFilter != null) {      filters.add(jobFilter);    }    PathFilter inputFilter = new MultiPathFilter(filters);        for (int i=0; i < dirs.length; ++i) {      Path p = dirs[i];      FileSystem fs = p.getFileSystem(job.getConfiguration());       FileStatus[] matches = fs.globStatus(p, inputFilter);      if (matches == null) {        errors.add(new IOException("Input path does not exist: " + p));      } else if (matches.length == 0) {        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));      } else {        for (FileStatus globStat: matches) {          if (globStat.isDirectory()) {            RemoteIterator
 iter =                fs.listLocatedStatus(globStat.getPath());            while (iter.hasNext()) {              LocatedFileStatus stat = iter.next();              if (inputFilter.accept(stat.getPath())) {                if (recursive && stat.isDirectory()) {                  addInputPathRecursively(result, fs, stat.getPath(),                      inputFilter);                } else {                  result.add(stat);                }              }            }          } else {            result.add(globStat);          }        }      }    }    if (!errors.isEmpty()) {      throw new InvalidInputException(errors);    }    LOG.info("Total input paths to process : " + result.size());     return result;  }

3、protected long computeSplitSize(long blockSize, long minSize, long maxSize),计算出当前Job所配置的分片最大尺寸。

  protected long computeSplitSize(long blockSize, long minSize,                                  long maxSize) {    return Math.max(minSize, Math.min(maxSize, blockSize));  }

4、protected int getBlockIndex(BlockLocation[] blkLocations, long offset), 由于组成文件的块的信息已经获得了,只需要根据offset来计算所在的那个块就行了。

  protected int getBlockIndex(BlockLocation[] blkLocations,                               long offset) {    for (int i = 0 ; i < blkLocations.length; i++) {      // is the offset inside this block?      if ((blkLocations[i].getOffset() <= offset) &&          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){        return i;      }    }    BlockLocation last = blkLocations[blkLocations.length -1];    long fileLength = last.getOffset() + last.getLength() -1;    throw new IllegalArgumentException("Offset " + offset +                                        " is outside of file (0.." +                                       fileLength + ")");  }

 转自:, 

转载于:https://my.oschina.net/KingPan/blog/288549

你可能感兴趣的文章
《Windows Server 2012 Hyper-V虚拟化管理实践》一1.2 Hyper-V安装前后的变化
查看>>
Proxmox VE 4.4 发布,新 Ceph 仪表盘上线
查看>>
《CCNP TSHOOT(642-832)学习指南》一1.2 维护进程及维护流程
查看>>
华为宣布开源流处理平台查询语言 StreamCQL
查看>>
2016 年 6 月 RedMonk 编程语言排行榜
查看>>
《Adobe Photoshop CC经典教程(彩色版)》—第1课1.4节在Photoshop中还原操作
查看>>
HttpClient使用详解
查看>>
增强现实?先不要指望那些眼镜了
查看>>
《iOS 6核心开发手册(第4版)》——1.10节秘诀:使用多触摸交互
查看>>
《云数据管理:挑战与机遇》一第1章
查看>>
《嵌入式C编程实战》——1.5 软件开发工具
查看>>
分析3000份技术面试数据:这几大指标比你毕业于哪所学校更要紧
查看>>
Linux有问必答:如何检查PDF中使用了哪种字体
查看>>
《Lua游戏AI开发指南》一2.1 新建一个沙箱项目
查看>>
如何使用 Weave 以及 Docker 搭建 Nginx 反向代理/负载均衡服务器
查看>>
《Android 应用测试指南》——第1章,第1.4节测试的种类
查看>>
对jquery val 获取input 文本框值进行扩展
查看>>
MySQL (select_paren) union_order_or_limit 行为
查看>>
并发不是并行,它更好!
查看>>
nltk 自己训练模型例子
查看>>