共享ip做网站wordpress 显示置顶文章
共享ip做网站,wordpress 显示置顶文章,青岛企业做网站,荆州seo推广4.6 Shuffle机制Shuffle的本义是洗牌、混洗#xff0c;即把⼀组有⼀定规则的数据打散重新组合转换成⼀组⽆规则随机数据分区。Spark中的Shuffle更像是洗牌的逆过程#xff0c;把⼀组⽆规则的数据尽量转换成⼀组具有⼀定规则的数据#xff0c;Spark中的Shuffle和MapReduce中…4.6 Shuffle机制Shuffle的本义是洗牌、混洗即把⼀组有⼀定规则的数据打散重新组合转换成⼀组⽆规则随机数据分区。Spark中的Shuffle更像是洗牌的逆过程把⼀组⽆规则的数据尽量转换成⼀组具有⼀定规则的数据Spark中的Shuffle和MapReduce中的Shuffle思想相同在实现细节和优化⽅式上不同因此掌握Hadoop的Shuffle原理的⽤户很容易将原有知识迁移过来。为什么Spark计算模型需要Shuffle过程我们都知道Spark计算模型是在分布式的环境下计算的这就不可能在单进程空间中容纳所有的计算数据来进⾏计算这样数据就按照Key进⾏分区分配成⼀块⼀块的⼩分区打散分布在集群的各个进程的内存空间中并不是所有计算算⼦都满⾜于按照⼀种⽅式分区进⾏计算。例如当需要对数据进⾏排序存储时就有了重新按照⼀定的规则对数据重新分区的必要Shuffle就是包裹在各种需要重分区的算⼦之下的⼀个对数据进⾏重新组合的过程。在逻辑上还可以这样理解[1]由于重新分区需要知道分区规则⽽分区规则按照数据的Key通过映射函数Hash或者Range等进⾏划分由数据确定出Key的过程就是Map过程同时Map过程也可以做数据处理例如在Join算法中有⼀个很经典的算法叫Map Side Join就是确定数据该放到哪个分区的逻辑定义阶段。Shuffle将数据进⾏收集分配到指定Reduce分区Reduce阶段根据函数对相应的分区做Reduce所需的函数处理。下⾯结合源码和图4-20从物理实现上看Spark的Shuffle是怎样实现的将Shuffle分为两个阶段Shuffle Write和Shuffle Fetch阶段Shuffle Fetch中包含聚集Aggregate在Spark中整个Job转化为⼀个有向⽆环图DAG来执⾏从图4-21中可以看出在整个DAG中是在每个Stage的承接阶段做Shuffle过程。图4-20中整个Job分为Stage0~Stage34个Stage。⾸先从最上端的Stage2、Stage3执⾏每个Stage对每个分区执⾏变换transformation的流⽔线式的函数操作执⾏到每个Stage最后阶段进⾏Shuffle Write将数据重新根据下⼀个Stage分区数分成相应的Bucket并将Bucket最后写⼊磁盘。这个过程就是Shuffle Write阶段。执⾏完Stage2、Stage3之后Stage1去存储有Shuffle数据节点的磁盘Fetch需要的数据将数据Fetch到本地后进⾏⽤户定义的聚集函数操作。这个阶段叫Shuffle FetchShuffle Fetch包含聚集阶段。这样⼀轮⼀轮的Stage之间就完成了Shuffle操作。下⾯我们更细粒度地将Shuffle的阶段进⾏拆分以更深⼊剖析和了解。1.Shuffle Write由于Spark的每个Stage中是通过执⾏任务来进⾏运算的⽽Spark中只分为两种任务ShuffleMapTask和ResultTask。其中ResultTask就是最底层的Stage也是整个任务执⾏的最后阶段将数据输出到Spark执⾏空间Stage除了这个阶段执⾏ResultTask其余阶段都执⾏ShuffleMapTask。因此主要的Shuffle Write逻辑存在这种任务的代码中。由于Shuffle属于⼤数据优化的⼀个很重要的阶段所以这⾥的代码优化会⽐较频繁下⾯基于Spark 1.0的代码进⾏介绍后续发展变化请读者参考相应版本。1Shuffle Write流程ShuffleWrite的⼊⼝是通过ShuffleMapTask中的runTask⽅法进⼊的也是整个Shuffle Write的控制⾻架。override def runTaskcontext TaskContext MapStatus { …… writer manager.getWriter[Any Any]dep.shuffleHandle partitionId context /*此处相当于使⽤ShuffleWriter将相应的分区进⾏Shuffle Write*/ writer.writerdd.iteratorsplit context.asInstanceOf[Iterator[_ Product2[Any Any]]] return writer.stopsuccess true.get …… }ShuffleWriter是个抽象的特征Trait下⾯看下它的具体实现。例如我们看看HashShuffleWriter中是怎样实现的HashShuffleWriter的主要功能其实就是判断是否需要做MapSideCombine或者做普通的Shuffle并且提供Shuffle Write各个流程的函数。override def writerecords Iterator[_ Product2[K V]] Unit { private val shuffle shuffleBlockManager.forMapTaskdep.shuffleId mapId numOutputSplits ser /*这⾥判断是否进⾏MapSideCombine也就是判断是否做Map端聚集合并如果合并能够在Map端做将会 很⼤程度减少⽹络传输的数据量减少开销*/ val iter if dep.aggregator.isDefined { if dep.mapSideCombine { dep.aggregator.get.combineValuesByKeyrecords context } else { records } …… for elem - iter { val bucketId dep.partitioner.getPartitionelem._1 /*这⾥调⽤ShuffleWriterGroup的writers获取数据写⼊器将数据写⼊bucket*/ shuffle.writersbucketId.writeelem } }下⾯进⼊ShuffleBlockManager来分析最终要做的Shuffle Write逻辑。从这段代码中可以看出Spark⽀持两种类型的ShuffleShuffle和优化的Consolidate Shuffle。val writers Array[BlockObjectWriter] if consolidateShuffleFiles { fileGroup getUnusedFileGroup Array.tabulate[BlockObjectWriter]numBuckets { bucketId val blockId ShuffleBlockIdshuffleId mapId bucketId /*两种Shuffle的区别其实是在对Bucket的处理是否写⼊FileGroup中 FileGroup就是⼀个⽂件数组存储⽂件的引⽤。在内存中维持这些FileGroup的引⽤*/ blockManager.getDiskWriterblockId fileGroupbucketId serializer bufferSize } } else { Array.tabulate[BlockObjectWriter]numBuckets { bucketId val blockId ShuffleBlockIdshuffleId mapId bucketId /*此处逻辑是获取相应的块由于每次都是第⼀次获取所以会创建新⽂件这⾥每次都会产⽣新的⽂件*/ val blockFile blockManager.diskBlockManager.getFileblockId …… blockManager.getDiskWriterblockId blockFile serializer bufferSize } }其中图4-21为Shuffle FileGroup的结构。Shuffle做Shuffle Write的细节如图4-22所⽰。注意这⾥的数据是直接写⼊缓冲中⽽未经过排序。最终在HashShuffleWriter将内存的Bucket写到磁盘存储为⽂件并将Shuffle的各个Bucket及映射信息返回给主节点。2Shuffle和Consolidate Shuffle对⽐下⾯从图4-23和图4-24中更加直观地对⽐Shuffle和Consolidate Shuffle的整体流程区别。图4-23中是进⾏Shuffle的整体流程假定该Shuffle中有3个Mapper和2个Reducer这样会产⽣3×26个Bucket也就是会产⽣6个Shuffle⽂件。因此产⽣的Shuffle⽂件个数为M×RM是Map任务个数R是Reduce任务数。图4-24是Consolidation Shuffle的流程图。其中每⼀个Bucket并⾮对应⼀个⽂件⽽是对应⽂件中的⼀个segment同时Consolidation Shuffle所产⽣的Shuffle⽂件数量与Spark Core的个数也具有相关性。在上⾯的图例中Job的4个Mapper分为两批运⾏在第⼀批2个Mapper运⾏时申请4个Bucket产⽣4个Shuffle⽂件在第⼆批Mapper运⾏时由于只有⼀个Mapper申请的4个bucket并不会再产⽣4个新的⽂件⽽是追加写到之前的其中两个⽂件后⾯这样⼀共只有4个shuffle⽂件⽽在⽂件内部这有6个不同的segment。因此从理论上讲Shuffle Consolidation所产⽣的shuffle⽂件数量为C×R其中C是Spark集群的Core NumberR是Reducer的个数。这⾥的特殊情况是当MC时Consolidation Shuffle所产⽣的⽂件数和之前的实现相同。Consolidation Shuffle显著减少了shuffle⽂件的数量解决了⽂件数量过多的问题但是Writer Handler的Buffer开销过⼤依然没有减少若要减少Writer Handler的Buffer开销只能减少Reducer的数量但是这⼜会引⼊新的问题。2.Shuffle FetchShuffle write阶段写到各个节点的数据Reducer端的节点通过拉取数据进⽽获取需要的数据在Spark中这个叫Fetch。这就需要Shuffle Fetcher将所需的数据拉过来。这⾥的fetch包括本地和远端因为shuffle数据有可能⼀部分存储在本地。Spark使⽤两套框架实现Shuffle FetcherNIO通过Socket连接去fetch数据OIO通过Netty去Fetch数据分别对应的类是BasicBlockFetcherIterator和NettyBlockFetcherIterator。Spark的团队最终还是想⽤⼀个NIO的通信层来解决问题但是经过性能测试在⼀些特定情况下如集群CPU核数很多地进⾏⼤规模Shuffle时NIO性能表现不如OIO所以Spark开发团队⽬前选择让⼆者共存。图4-25以reduceByKey为例介绍这个算⼦对应的Shuffle Fetch阶段。这个Job分为两个Stage在Stage1和Stage0之间做Shuffle Fetch的操作。HadoopRDD的每个B代表HDFS的⼀个分区读⼊后通过映射转化为MapPartitionsRDD做完Shuffle Write之后Shuffle数据按照Bucket存储磁盘。Stage0的每个Task通过元数据知道数据存储在哪个节点到该节点Fetch需要的指定Key的数据。在Stage0将Fetch到的数据形成分区所有分区形成ShuffledRDD。通过聚集函数将ShuffledRDD每个分区中的每条数据存储到AppandOnlyMap其本质可以理解为⼀个哈希表中在这个过程中执⾏⽤户定义的聚集函数做聚集操作。最后将形成的结果形成分区所有分区形成MapPartitionsRDD。Shuffle Fetch和聚集Aggregate的操作过程是边Fetch数据边处理⽽不是⼀次性Fetch完再处理。通过Aggregate的数据结构 AppandOnlyMap⼀个Spark封装的哈希表。Shuffle Fetch得到⼀条Key-Value对直接将其放进AppandOnlyMap中。如果该HashMap已经存在相应的Key那么直接处理⽤户⾃定义聚集函数合并聚集数据。3.Shuffle Aggregator接下来介绍Aggregator聚集。我们都知道在Hadoop MapReduce的Shuffle过程中Shuffle Fetch过来的数据会进⾏归并排序merge sort使得相同Key下的不同Value按序归并到⼀起供Reducer使⽤但是Spark认为并不是所有的情况下Aggregator都需要排序强制的排序只会增加不必要的开销。下⾯介绍Spark的聚集是怎样实现的。Spark的聚集⽅式分为两种不需要外排和需要外排的。不需要外排的聚集在内存中的AppendOnlyMap中对数据进⾏聚集⽽需要外排的聚集先在内存做聚集当内存数据达到阈值时将数据排序后写⼊磁盘由于磁盘的每部分数据只是整体的部分数据最后再将磁盘数据全部进⾏合并和聚集。实现上分别采⽤了不同⾃定义容器收集聚集。Aggregator采⽤封装好的数据容器存储Key-Value本质上是⼀个哈希表来存储。图4-26是AppendOnlyMap不需要外排的聚集。容器本质上可以理解为⼀个HashMap。当要增加数据时⾸先对关键字进⾏哈希运算查找存放位置如果存放位置已经被占⽤则通过探测⽅法来找下⼀个空闲位置。图4-26中如果插⼊Key1-Value3则冲突两次需要再哈希两次找到新位置插⼊数据。当进⾏迭代AppendOnlyMap中的元素时从前到后扫描输出。如果Array的利⽤率达到70%就扩张⼀倍并对所有Key进⾏再哈希后重新排列每个Key的位置。当⽤户计算count时它会更新shuffle fetch到的每⼀个Key-Value对数据插⼊Map中若在Map中没有查找到则插⼊其中若查找到则更新value值。数据来⼀个处理⼀个减少了不必要的排序开销。但同时需要注意Reducer的内存必须⾜以存放这个分区的所有Key和count值因此需要Worker节点保证提供⾜够内存。需要外排的聚集的原因是如果是Reduce型的操作则数据不断被计算合并数据量不会暴增。考虑⼀下如果是groupByKey这样的操作Reducer需要得到Key对应的所有Value。Spark需要将Key-Value全部存放在Hashmap中并将Value合并成⼀个数组。为了能够存放所有数据必须确保每⼀个分区⾜够⼩内存能够容纳这个分区。因此官⽅建议涉及这类操作时尽量增加分区数量也就是增加Mapper和Reducer的数量。增加Mapper和Reducer的数量可以减⼩分区的⼤⼩使得内存可以容纳这个分区。Bucket的数量由Mapper和Reducer的数量决定Task越多Bucket增加得越多由此带来Writer所需的Buffer缓存也会更多。增加Task数量⼜会带来缓冲开销更⼤的问题正是这个原因Spark提供了外排⽅案。下⾯通过源码剖析内排和外排两种⽅式的选择逻辑。下⾯代码为Aggregator类其封装相应的聚集函数逻辑DeveloperApi case class Aggregator[K V C] createCombiner V C mergeValue C V C mergeCombiners C C C { /*此处决定内存容量不⾜时是否采⽤外排的⽅式⽽这⼜是通过这个参数来确定的*/ private val externalSorting SparkEnv.get.conf.getBooleanspark.shuffle.spill true …… def combineValuesByKeyiter Iterator[_ Product2[K V]] context TaskContext Iterator[K C] { if externalSorting { /*此处使⽤了内部优化的数据结构combiners存储了combiner的集合每个combiner代表⼀个Key和对应 Key的元素Seq*/ val combiners new AppendOnlyMap[KC] var kv Product2[K V] null val update hadValue Boolean oldValue C { /*看处理的是否是第⼀个元素如果是则需要创建集合结构如果不是第⼀个则插⼊原来创建的结构中*/ if hadValue mergeValueoldValue kv._2 else createCombinerkv._2 } while iter.hasNext { kv iter.next /*如果不采⽤外排则调⽤AppendOnlyMap的聚集数据结构进⾏存储有兴趣可以看具体的实现*/ combiners.changeValuekv._1 update } combiners.iterator } else { val combiners new ExternalAppendOnlyMap[K V C]createCombiner mergeValue mergeCombiners while iter.hasNext { val k v iter.next /*如果采⽤外排则使⽤ExternalAppendOnlyMap这个Spark定义的数据结构存储聚集数据*/ combiners.insertk v } } }[1] 注意在Spark中没有明确限定是Map过程还是Reduce过程由于整个逻辑过程在Hadoop已经成为事实公认的标准可以迁移原有知识理解。4.7 本章⼩结本章介绍了Spark的内部运⾏机制。主要介绍了Spark的执⾏机制和调度机制包括调度与任务分配机制、I/O机制、通信机制、容错机制和Shuffle机制。Spark在执⾏过程中由Driver控制应⽤⽣命周期。调度中Spark采⽤了经典的FIFO和FAIR等调度算法对内部的资源实现不同级别的调度。在Spark的I/O中将数据抽象以块为单位进⾏管理RDD中的⼀个分区就是需要处理的⼀个块。集群中的通信对于命令和状态的传递极为重要Spark通过AKKA框架进⾏集群消息通信。Spark通过Lineage和Checkpoint机制进⾏容错性保证Lineage进⾏重算操作Checkpoint进⾏数据冗余备份。最后介绍了Spark中的Shuffle机制Spark也借鉴了MapReduce模型但是其Shuffle机制进⾏了创新与优化。通过阅读本章读者可以深⼊了解Spark的内部原理这对上层应⽤开发与性能调优是⼗分重要的。介绍完Spark内部的执⾏执⾏机制相信读者已经跃跃欲试希望开发⾃⼰的Spark程序下⾯章节将引导读者配置Spark开发环境然后介绍Spark的编程实战。