classSparkContext(config: SparkConf) extendsLogging{ /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ defdefaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism } defparallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() newParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } }
abstractclassRDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extendsSerializablewithLogging{
/** Construct an RDD with just a one-to-one dependency on one parent */ // 实现单参数rdd的构造方法 defthis(@transient oneParent: RDD[_]) = this(oneParent.context, List(newOneToOneDependency(oneParent)))
/** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ // getDependencies等于rdd构造方法参数中的deps protecteddefgetDependencies: Seq[Dependency[_]] = deps
/** * Get the list of dependencies of this RDD, taking into account whether the * RDD is checkpointed or not. */ finaldefdependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(newOneToOneDependency(r))).getOrElse { if (dependencies_ == null) { // 先不考虑checkpoint的情况,则dependencies= dependencies_ = getDependencies dependencies_ = getDependencies } dependencies_ } } /** Returns the first parent RDD */ protected[spark] deffirstParent[U: ClassTag]: RDD[U] = { // firstParent为dependencies容器中第一个元素 dependencies.head.rdd.asInstanceOf[RDD[U]] } }
/** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. * * Note that, partitioner must be deterministic, i.e. it must return the same partition id given * the same partition key. */ // 抽象分区器 abstractclassPartitionerextendsSerializable{ // 需要分多少个区 defnumPartitions: Int // 传入key,就返回其应该存在哪个分区 defgetPartition(key: Any): Int }
objectPartitioner{ /** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * * If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism * as the default partitions number, otherwise we'll use the max number of upstream partitions. * * When available, we choose the partitioner from rdds with maximum number of partitions. If this * partitioner is eligible (number of partitions within an order of maximum number of partitions * in rdds), or has partition number higher than default partitions number - we use this * partitioner. * * Otherwise, we'll use a new HashPartitioner with the default partitions number. * * Unless spark.default.parallelism is set, the number of partitions will be the same as the * number of partitions in the largest upstream RDD, as this should be least likely to cause * out-of-memory errors. * * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ // 传入一个rdd以及传入可变长rdd参数 other(即可以不传,也可以传一个或者多个) defdefaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { // 拼接两个rdd到序列 val rdds = (Seq(rdd) ++ others) // 过滤rdds序列中有partitioner并且对应的numPartitions>0的rdds序列 val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) // 从rdds序列中选择partitioner中partition数量的rdd,称为最大分区器rdd val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) { Some(hasPartitioner.maxBy(_.partitions.length)) } else { None }
// 定义默认的分区数量 val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) { // 如果定义了"spark.default.parallelism",则为其值 rdd.context.defaultParallelism } else { // 否则为rdds序列中各个rdd分区数的最大值 rdds.map(_.partitions.length).max }
// If the existing max partitioner is an eligible one, or its partitions number is larger // than the default number of partitions, use the existing partitioner. if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) || defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) { // 如果有最大分区器rdd,并且其分区数是合理的;或者有最大分区器rdd,并且其分区数量大于默认的分区数量defaultNumPartitions;返回最大分区器rdd的partitioner // 这个if-else语句嵌套到上一个if-else语句的话,代码会更加清晰? hasMaxPartitioner.get.partitioner.get } else { // 否则将以默认分区数量defaultNumPartitions实例化一个HashPartitioner,并返回 newHashPartitioner(defaultNumPartitions) } }
/** * Returns true if the number of partitions of the RDD is either greater than or is less than and * within a single order of magnitude of the max number of upstream partitions, otherwise returns * false. */ // 判断最大分区器的rdd的分区数目对于其他rdd是否合理 privatedefisEligiblePartitioner( hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { // 获取rdds序列中最大的分区数量 val maxPartitions = rdds.map(_.partitions.length).max // 如果rdds序列中最大的分区数量不大于最大分区器分区数量一个数量级,则返回true;否则返回false log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1 } }