Home > Archives > Spark基础之coalesce和repartition

Spark基础之coalesce和repartition

Published on

在Spark中,如果想要对分区数进行调整,我们一般会使用coalescerepartition这两个方法。

coalesce主要是用于减小RDD中的分区(当然增加也是没有问题的), 比较典型的使用场景:

而repartition既可以用来增加分区也可以用来减少分区,不过一般伴随着RDD分区的shuffle。

之前对于这两个方法的理解仅限于此,但今天一个同事再做一个任务的时候,将这两种操作的区别进一步展示出来了。逻辑其实很简单对于1000个分区分别进行筛选操作,将最终结果压缩到1个分区存入HDFS系统中。他最开始使用coalesce的时候总是会显示内存不够,后来改为repartition之后才完成了。

后来在源码文档中发现了下面这句话:

However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1)

最后结合Stage的划分弄清楚了这个问题,以下面的简化实例予以说明。

// textFile --> 默认读取的是1000个分区
val rdd = sc.textFile(bigFile).filter(...).coalesce(1).saveAsTextFile(...)
val rdd = sc.textFile(bigFile).filter(...).repartition(1).saveAsTextFile(...)

对于coalesce,前面三个阶段都是窄依赖所以可以合并,但由于最终只有一个分区,在节点上计算父RDD(HadoopRDD)的分区时候需要从各个节点拉取数据,然后做过滤操作,最终导致计算节点因内存不足而任务失败。

而repartition因为有shuffle操作,所以被分割成为两个Stage。筛选操作发生在各个节点(筛选力度较大,导致数据量减小),然后通过shuffle read转移到单个节点,所以内存占用会小很多。

总结一句就是,如果通过coalesce(默认行为)骤降分区数,计算会发生在很少的节点上,可能会由于内存不足导致任务失败,所以要慎用

后来通过阅读coalesce的源代码,对于减小分区时,它是如何构建新分区和原分区的关系(新分区以原RDD中的哪几个分区作为父分区,需不需要考虑就近原则,需不需要考虑负载等等)的又有了新的了解。

首先,这两种方法都产生了CoalescedRDD,只不过repartition过程产生了一个中间RDD也就是ShuffledRDD。我们知道在RDD的计算过程中,都会有一个方法是获取其中的分区,然后逐个追溯父分区,然后计算到当前分区,所以getPartitions便是建立分区之间关系的核心。

// balanceSlac --> 负载和就近原则之间的平衡, 1.0 is all locality, 0 is all balance
private[spark] class CoalescedRDD[T: ClassTag](
    prev: RDD[T], 
    maxPartitions: Int, 
    balanceSlack: Double = 0.1
) {
    ...
    override def getPartitions: Array[Partition] = {
        val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
    }
    ...
}

private class PartitionCoalescer(
    maxPartitions: Int, prev: RDD[_], balanceSlack: Double
) { 
    ...
    val groupArr = ArrayBuffer[PartitionGroup]()
    
    // 如果是true的话,父RDD中的分区就不存在preferredLocations
    var noLocality = true
    ...
}

CoalescedRDD构建的过程中,会生成M个PartitionGroup,然后把原来的Partition按照规则放入PartitionGroup中,实际上PartitionGroup对应的就是新生成的CoalesceRDD中的一个Partition,只不过它依赖父RDD中的多个Partition,也就是计算的时候从固定个父RDD的Partition中取数据(窄依赖)。

而这种实现包含了一个重要的算法balls-into-bins – 它解决的是将m个球放入n个盒子。每一次,我们会将一个球放入到其中一个盒子中。当所有的球都在盒子中了,汇总每个盒子中球的个数,这个数字也称为该盒子的负载,我们要得出的结论是单个盒子中最大的负载是多少?

对应来看就是现有prev.partitions.size个分区,我们产生math.min(prev.partitions.size, maxPartitions)个分区组(盒子)。然后将这些分区放进去,放的时候需要在两个原则之间进行权衡: 就近原则(locality)和负载均衡(load balance)。也就是说如果PartitionGroup在server A,有一个待放入的Partition也在server A上,如果从就近原则考虑应该放在server A,但如果此时server A的负载已经很重,我们需要找出其它负载较小的ParttitionGroup然后放进去。

首先,目标分区数targetLen(PartitionGroup的组数)是原RDD分区数和传入的分区数之间的最小值。

(1) 产生PartitionGroup – CoalescedRDD.setupGroups

因为分组是就近原则和负载均衡的一个权衡,所以首先会获取父RDD中每一个分区的最优计算位置。

class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
      def resetIterator(): Iterator[(String, Partition)] = {
        ...
        prev.partitions.iterator.flatMap(p => {
          if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
        } )
        ...
    }
}

如果父RDD的分区没有优先位置,则新建targetLen个PartitionGroup。

(1 to targetLen).foreach { groupArr += PartititonGroup() }

如果父RDD的分区中有优先位置,要么创建出targetLen个唯一并且独立的优先位置,要么经过 expectedCoupons2次迭代,这种迭代涉及到另外一种算法赠券收集问题 – 假设有n种赠券,每种赠券获取概率相同,而且赠券可以无限供应。若取赠券T张,能集齐n种赠券的概率多少?在Spark中实际上就是,通过多少次迭代能获取到所有唯一的host地址,但一般来说PartitionGroup的个数都要大于机器数,所以一个机器上通常有多个PartitionGroup。

 def setupGroups(targetLen: Int) {
    val expectedCoupons2 = 2 * (math.log(targetLen) * targetLen + targetLen + 0.5).toInt

    // 每遇到一个host地址,就新增一个PartitionGroup;如果尝试次数结束之后, 
    // PartitionGroup还是不够,则在之前的host上再增加PartitionGroup,然后将Partition放进去
    // 所以在setGroups过程有些Partition就已经提前被放进去了,所以后面才会有负载的考虑
    while (numCreated < targetLen && tries < expectedCoupons2) {
       ....     
    }
    
    while (numCreated < targetLen) {
       ....
    }
 }

(2) 将Partition放入PartitionGroup中 – CoalescedRDD.throwBalls()

如果父RDD的分区没有优先位置(noLocality=true)

// groupArr.size => math.min(prev.partitions.length, maxPartitions)
// 如果coalesce是想扩大分区,则直接将父RDD中的每一个分区放入每一个PartitionGroup
if (maxPartitions > groupArr.size) {
    for ((par, i) <- prev.partitions.zipWithIndex) {
        groupArr(i).arr += p
    } else {
        // 如果coalesce的操作是缩小分区,则以maxPartitions为步长,将原RDD分区拆成maxPartitions个组
        
        for (i <- 0 until maxPartitions) {
            // 0 / M ~ 1 / M -> 1 / M ~ 2 / M
            val rangeStart = (i * prev.partitions.size) / maxPartitions
            val rangeEnd = ((i + 1) * prev.partitions.size) / maxPartitions
           
           rangeStart until rangeEnd).foreach { j => groupArr(i).arr += prev.partitions(j) }
        }
    
    }
}

如果父RDD的分区有优先位置(noLocality=false),会通过pickBin方法,在the power of two choices in randomized load balancing原则的指导下,来将原RDD中的分区依次放入。

for (p <- prev.partitions if (!initialHash.contains(p))) { 
    pickBin(p).arr += p
}

上面的initialHash充分说明,之前已经有Partition被提前放进PartitionGroup里面了。

(3) 计算时的逻辑

因为coalesce形成的是窄依赖,新RDD分区计算的时候就需要取原RDD中对应的分区(即PartitionGroup中对应的分区)。

override def getPartitions: Array[Partition] = {
    // pc.run() ==> Array[PartitionGroup]
    pc.run().zipWithIndex.map {
      case (pg, i) =>
        val ids = pg.arr.map(_.index).toArray
        
        new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
    }
}

参考

> randomized load balancing

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方