阿里云Spark Shuffle的优化

辰石 大数据技术与架构

本次分享者:辰石,来自阿里巴巴计算平台事业部EMR团队技术专家,目前从事大数据存储以及Spark相关方面的工作。

  • Spark Shuffle介绍
  • Smart Shuffle设计
  • 性能分析

    Spark Shuffle流程

  • Spark 0.8及以前 Hash Based Shuffle
  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
  • Spark 0.9 引入ExternalAppendOnlyMap
  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
  • Spark 1.4 引入Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle退出历史舞台
    总结一下, 就是最开始的时候使用的是 Hash Based Shuffle, 这时候每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是M x R ,其中M是Map的个数,R是Reduce的个数。这样会产生大量的小文件,对文件系统压力很大,而且也不利于IO吞吐量。后面忍不了了就做了优化,把在同一core上运行的多个Mapper 输出的合并到同一个文件,这样文件数目就变成了 cores R 个了。

Spark Shuffle实现

Sort-based shuffle介绍

这个方式的选择是在org.apache.spark.SparkEnv完成的:

    // Let the user specify short names forshuffle managers    val shortShuffleMgrNames = Map(      "hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",      "sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")    val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //获得Shuffle Manager的type,sort为默认    val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)    val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)

Hashbased shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M x R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。Hash based shuffle设计的目标之一就是避免不需要的排序(Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销)。但是它在处理超大规模数据集的时候,产生了大量的DiskIO和内存的消耗,这无疑很影响性能。
Hash based shuffle也在不断的优化中,正如前面讲到的Spark 0.8.1引入的file consolidation在一定程度上解决了这个问题。为了更好的解决这个问题,Spark 1.1 引入了Sort based shuffle。首先,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。
避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件对系统带来的压力。
目前writer的实现分为三种, 分为 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter。

SortShuffleManager只有BlockStoreShuffleReader这一种ShuffleReader。

Spark-shuffle存在的问题

同步操作

Shuffle数据只有等map task任务结束后可能会触发多路归并生成最终数据。

大量的磁盘IO

Shuffle的数据在Merge阶段存在大量的磁盘读写IO,在sort-merge阶段对磁盘IO带宽要求较高。

计算与网络的串行

Task任务计算和网络IO的串行操作。

Smart Shuffle

shuffle数据的pipeline

shuffle数据在map端累积到一定数量发送到reduce端。

避免不必要的网络IO

根据partition数量的位置,可以调度该reduce任务到相应的节点。

计算和网络IO的异步化

shuffle数据的生成和shuffle数据的发送可以并行执行。

避免sort-merge减少磁盘IO

shuffle数据是按照partition进行分区,shuffle数据无需sort-merge

Smart Shuffle使用

  • 配置spark.shuffle.manager : org.apache.spark.shuffle.hash.HashShuffleManager
  • 配置spark.shuffle.smart.spill.memorySizeForceSpillThreshold:控制shuffle数据占用内存的大小,默认为128M
  • 配置spark.shuffle.smart.transfer.blockSize:控制shuffle在网络传输数据块的大小

    性能分析

硬件及软件资源:

TPC-DS性能:

Smart shuffle TPC-DS性能提升28%:

  • Smart shuffle没有打来单个query性能的下降
  • 单个query最大能够带来最大2倍的性能提升
    提取Q2和Q49查询性能分析:
  • Q2在两种shuffle性能保持一致
  • Q49在Smart shuffle下性能有很大提升
    单个查询对比:

左侧为sorted shuffle,右边为smart shuffle。 Q2查询相对简单,shuffle数据也比较少,smart shuffle性能保持不变。
Q2 CPU对比: 左侧为sorted shuffle,右侧是smart shuffle

磁盘对比:
左侧为sorted shuffle,右侧是smart shuffle

©著作权归作者所有:来自51CTO博客作者mob604756e85b28的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. Python_学习之文件读写
  2. 汉森的研究反省:充分利用数据来对经济建模 | 计算机和经济学交叉
  3. 2020年要做的几件大事
  4. 深入浅出谈 Capital One 数据泄露事件
  5. 大数据独角兽 MapR 难以为继,将资产出售给惠普企业
  6. 年轻人你渴望力量吗 | 我读过的一些书推荐
  7. Apache老母鸡又下蛋?一文俯瞰Apache Superset
  8. 基于Prometheus+Grafana打造企业级Flink监控系统
  9. 企业级大数据平台建设参考 | 淘宝&滴滴&美团&360&快手&京东

随机推荐

  1. Android Support v4、v7、v13的区别和应
  2. Android4.4支持使用DevTools对原生Androi
  3. Android MediaStore数据之增删查改剖析
  4. 让Android中的webview支持页面中的文件上
  5. Android之——多线程下载示例
  6. Android倒计时Button
  7. Android的调试原理--学习笔记
  8. Windows Phone 7 不温不火学习之《ListBo
  9. android 调用系统 录像,录音等
  10. Android的联通性---USB的从属模式(一)