如无特别说明,本文源码版本为 spark 2.3.4
创建rdd有三种方式,一种是通过SparkContext.textFile()访问外部存储创建,一种是通过输入数据集合通过调用 SparkContext.parallelize() 方法来创建,最后一种是通过转换已有的rdd生成新的rdd。
通过parallelize创建rdd的分区数量分析
通过parallelize的方式比较简单,相信也是大部分初学者第一次接触创建rdd的方法,那么通过这个方法创建的rdd的默认分区数是多少呢?我们通过源码进行分析。
1 | package org.apache.spark.SparkContext |
我们先看看parallelize是如何生成rdd的。可以看到它是通过 ParallelCollectionRDD 类创建一个rdd,其内部返回的partitioner是通过ParallelCollectionRDD伴生对象的slice方法分割seq为一个二维的Seq[Seq[T]],并把这个二维的序列传递到ParallelCollectionPartition中实例化的。
接下来是关键,defaultParallelism
的默认值确定了分区的数量。
1 | package org.apache.spark.SparkContext |
通过以上分析,我们可知通过parallelize创建rdd时,分区数量根据以下情况确定
- 如果部署模式为local:
- 如果定义了
spark.default.parallelism
则以其值作为分区大小 - 如果没有定义
spark.default.parallelism
,则以解析master参数中指定的值为分区大小
- 如果定义了
- 如果部署模式为standalone:
- 如果定义了
spark.default.parallelism
则以其值作为分区大小 - 如果没有定义
spark.default.parallelism
,则为math.max(totalCoreCount, 2),其中totalCoreCount为executor注册的所拥有core数量,不一定是申请core的总数。
- 如果定义了
TODO yarn模式的还未考虑,以后有时间加进来
对现有rdd进行transformation后分区数量分析
上一小节通过分析后台调度器的相关源码,我们已经知道通过parallelize创建rdd时partition的确定方法。这一节我们探讨通过转换前后分区数量如何确定。
以map()为例
1 | package org.apache.spark.rdd |
这里需要注意区分Partitioner和Partition。Partitioner是分区器,需要定义分区的数量numPartitions,以及通过传入key决定其在哪个partition的getPartition(key: Any)方法。而Partition则描述了当前rdd的分区状态,对于map而言其分区状态和父rdd一致。当然rdd也可以没有Partitioner就有Parition的情况,如默认情况下经过map转换的rdd,以及本文第一部分描述通过parallelize创建rdd,都是没有partitioner,其partitioner为None。
通过追溯firstParent,可知firstParent <- dependencies.head <- dependencies_.head <- getDependencies.head <- deps.head <- List(new OneToOneDependency(pre).head (这里完成rdd到dependency的转换),其中pre为调用map方法的rdd,即 MapPartitionsRDD 的父rdd。
回到map的paritions数量为多少的问题,从源码中也能看到其partitions将保持血统中第一个的父类的partition,不会改变原有的分区情况。但是也不会保留原有的分区器。
而类似的,flatMap的实现也和map一致。filter也差不多,由于其不会更改父rdd的key,所以preservesPartitioning为true,保留了血统中第一个父类的partitioner。
以reduceByKey()为例
1 | package org.apache.spark.rdd |
对于reduceByKey方法,当不传numPartitions参数时,其默认的分区器由defaultPartitioner()方法决定,分区器就决定了分区数。
defaultPartitioner()的决定分区器规则总结如下:
- 如果定义了”spark.default.parallelism”,则defaultNumPartitions = “spark.default.parallelism” ;如果未定义,则defaultNumPartitions等于所有rdd分区中最大的分区数
- 如果在所有rdd中有对应的partitioner,则选出分区数量最大的partitioner,并且该partitioner的分区数满足以下两个条件之一,则返回该partitioner作为API的partitioner
- 分区数量是合理的
- 分区数量大于defaultNumPartitions
- 否则,返回HashPartitioner(defaultNumPartitions)
总结,对于reduceByKey等类似的API而言,只要是通过defaultPartitioner()定义分区器的,其分区数量有三种情况:
- 等于默认值spark.default.parallelism
- 等于所有rdd中最大partition数量
- 等于所有partitioner中最大partition数量
也可以看出此类型的转换,partition数量总是趋向于变大,而”spark.default.parallelism”是个平衡点。
如果定义了”spark.default.parallelism”:
- 如果它定义的很小,对于没有分区器则分区数量很小。对于有分区器,defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions几乎永远为true,将保持最大分区器的分区数量,不会主动干预原来的分区情况。
- 如果它定义的很大,对于没有分区器则分区数量很大。对于有分区器,defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions几乎永远为false,结果依赖于最大分区器的分区数量小于分区数量最大的rdd的程度,如果相差不大则保留原来的分区器,如果相差很大,则以”spark.default.parallelism”作为新分区大小。
如果没定义”spark.default.parallelism”:
- 对于没有分区器,则分区数量等于所有rdd中最大partition数量。
- 对于有分区器,defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions永远为false,结果依赖于最大分区器的分区数小于分区数量最大的rdd的程度,如果相差不大则保留原来的分区器,如果相差很大,则以所有rdd的最大分区数为新分区大小。
保持partitioner的transformation
如上所述,rdd的parittioner是决定分区数量的重要因素,对于以下transformation 默认 将会保留和传播partitioner:
- cogroup
- groupWith
- join
- leftOuterJoin
- rightOuterJoin
- groupByKey
- reduceByKey
- foldByKey
- combineByKey
- partitionBy
- mapValues
- flatMapValues
- filter
其他transfermation将默认不保持分区器。因为其他操作(比如map)可能会修改key,修改了key后,原来的分区器就失去了它的意义。相反的,mapValues只修改value不修改key,所以其保留和传播分区器是合理的。
参考
https://github.com/rohgar/scala-spark-4/wiki/Partitioning
TODO 通过文件创建rdd还未考虑,以后有时间加进来
本文为学习过程中产生的总结,由于学艺不精可能有些观点或者描述有误,还望各位同学帮忙指正,共同进步。