RDD的操作算子除了单值型还有键值对(Key-Value)型。这里开始介绍键值对型的算子,主要包括groupByKey、combineByKey、reduceByKey、sortByKey、cogroup和join,如表3-5所示。
表3-5 键值对型Transformation算子
方法名 | 方法定义 |
groupByKey | def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] |
combineByKey | def combineByKey[C](createCombiner: V => C, mergeValue: (C,V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] |
reduceByKey | def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] |
sortByKey | def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] |
cogroup | def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD |
join | def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] |
1.groupByKey
类似groupBy方法,作用是把每一个相同Key值的的Value聚集起来形成一个序列,可以使用默认分区器和自定义分区器,但是这个方法开销比较大,如果想对同一Key进行Value的聚合或求平均,则推荐使用aggregateByKey或者reduceByKey。
方法源码实现:
def groupByKey(numPartitions:Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions))
}
def groupByKey(partitioner:Partitioner): RDD[(K, Iterable[V])] = {
//groupByKey不应该使用map端的combine操作,因为map端并不会减少shuffle的数据,还要求
//所有map端的数据都插入hash表中,导致很多对象进入内存中的老年代。
val createCombiner = (v: V) =>CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v:V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
createCombiner, mergeValue,mergeCombiners, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
【例3-19】groupByKey方法应用样例
这个例子先创建包含List集合对象的RDD,然后使用keyBy方法生成Key-Value键值对,然后调用groupByKey方法将相同Key的Value聚合,最后调用collect方法以数组形式输出。
图3-7 groupByKey方法应用样例
2.combineByKey
combineByKey方法能高效的将键值对形式的RDD按相同的Key把Value合并成序列形式,用户能自定义RDD的分区器和是否在map端进行聚合操作。
方法源码实现:
defcombineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue,mergeCombiners, defaultPartitioner(self))
}
defcombineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C, //输入2个不同类型参数,返回其中一个类型参数
mergeCombiners: (C, C) => C, //输入2个同类型参数,返回一个参数
numPartitions: Int): RDD[(K, C)] = {
combineByKey(createCombiner,mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
}
def combineByKey[C](createCombiner:V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)]= {
require(mergeCombiners!= null, "mergeCombiners must be defined") // required as of Spark0.9.0
if(keyClass.isArray) {
if(mapSideCombine) {
throw new SparkException("Cannot usemap-side combining with array keys.")
}
if(partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Defaultpartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)){
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context,aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self,partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
【例3-20】combineByKey方法应用样例
在使用zip方法得到键值对序列c后调用combineByKey,把相同Key的value进行合并到List中。这个例子中使用三个参数的重载方法,该方法第一个参数是createCombiner,作用是把元素V转换到另一类型元素C,该例子中使用的是List(_),表示将输入的元素放在list集合中;mergeValue的含义是把元素V合并到元素C中,在该例子中使用的是函数是x:List[String],y:String) => y :: x,表示将y字符串合并到x链表集合中;mergeCombiners含义是将两个C元素合并,在该例子中使用的是x:List[String], y:List[String]= x ::: y,表示把x链表集合中的内容合并到y链表集合中。
3.reduceByKey
使用一个reduce函数来实现对相同Key的Value的聚集操作,在发送结果给reduce前会在map端的执行本地merge操作。该方法的底层实现就是调用combineByKey方法的一个重载方法。
方法源码实现:
def reduceByKey(partitioner:Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func,partitioner)
}
def reduceByKey(func: (V, V)=> V, numPartitions: Int): RDD[(K, V)] = {
reduceByKey(newHashPartitioner(numPartitions), func)
}
def reduceByKey(func: (V, V)=> V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
【例3-21】reduceByKey方法应用样例
这个例子先用map方法映射出键值对,然后调用reduceByKey方法对相同Key的Value值进行累加。例子中第一个是使用字符串,故使用聚合相加后是字符串的合并;第二个例子使用的是数字,结果是对应Key的Value数字相加。
4.sortByKey
这个函数会根据Key值对键值对进行排序,如果Key是字母,则按字典顺序排序,如果Key是数字,则从小到大排序(或从大到小),该方法的第一个参数控制是否为升序排序,当为true时是升序,反之为降序。
方法源码实现:
def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.size) : RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions,self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) orderingelse ordering.reverse)
}
【例3-22】sortByKey方法应用样例
这个例子先通过zip方法得到包含键值对的变量c,然后演示了sortByKey方法中参数为true和false时的计算结果。本例中的key是字符串,故可以看出当Key为true时,结果是按Key的字典顺序升序输出,反之则为降序输出结果;当key为数字的时候,则按大小排列。
5.cogroup
cogroup是一个比较高效的函数,能根据Key值聚集最多3个键值对的RDD,把相同Key值对应的Value聚集起来。
方法源码实现:
//参数为一个RDD情况
def cogroup[W](other: RDD[(K,W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] = {
if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Defaultpartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self,other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W]])
}
}
//参数为两个RDD情况
def cogroup[W1, W2](other1:RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K,(Iterable[V], Iterable[W1], Iterable[W2]))] = {
if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Defaultpartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self,other1, other2), partitioner)
cg.mapValues { case Array(vs, w1s, w2s)=>(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]])
}
}
//参数为3个RDD情况
def cogroup[W1, W2, W3](other1:RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1],Iterable[W2], Iterable[W3]))] = {
if (partitioner.isInstanceOf[HashPartitioner]&& keyClass.isArray) {
throw new SparkException("Defaultpartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self,other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s)=>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}
【例3-23】cogroup方法应用样例
例子中有两个个小例子,依次是单个参数和两个参数的情况,使用cogroup方法对单个RDD和2个RDD进行聚集操作。
6.join
对键值对的RDD进行cogroup操作,然后对每个新的RDD下Key的值进行笛卡尔积操作,再对返回结果使用flatMapValues方法,最后返回结果。
方法源码实现:
def join[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other,partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <-pair._2.iterator) yield (v, w)
)
}
【例3-24】join方法应用样例
这个例子先构造两个包含键值对元素的变量b和d,然后调用join方法,得到join后的结果,根据源码实现,join方法本质是cogroup方法和flatMapValues方法的组合,其中cogroup方法得到聚合值,flatMapValues方法实现的是笛卡尔积,笛卡尔积的过程是在各个分区内进行,如例子中的Key等于2分区,wc与(wc,zq)求笛卡尔积,得到(2,(wc,wc))和(2,(wc,zq))的结果。
图3-8 join方法应用样例
3.5.3 Action算子
当Spark的计算模型中出现Action算子时才会执行提交作业的runJob动作,这时会触发后续的DAGScheduler和TaskScheduler工作。这里主要讲解常用的Action算子,有collect、reduce、take、top、count、takeSample、saveAsTextFile、countByKey、aggregate,具体方法和定义如表3-6所示。
表3-6 Action算子
方法名 | 方法定义 |
collect | def collect(): Array[T] |
reduce | def reduce(f: (T, T) => T): T |
take | def take(num: Int): Array[T] |
top | def top(num: Int)(implicit ord: Ordering[T]): Array[T] |
count | def count(): Long |
takeSample | def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T] |
saveAsTextFile | def saveAsTextFile(path: String) |
countByKey | def countByKey(): Map[K, Long] |
aggregate | def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U |
1.collect
collect方法的作用是把RDD中的元素以数组的方式返回。
方法源码实现:
def collect(): Array[T] = {
val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
【例3-25】collect方法应用样例
这个例子直接把RDD中的元素转换成数组返回。
2.reduce
reduce方法使用一个带两个参数的函数把元素进行聚集,返回一个元素结果,注意该函数中的二元操作应该满足交换律和结合律,这样才能在并行系统中正确计算。
方法源码实现:
def reduce(f: (T, T) => T): T = { //输入是两个参数的函数,返回一个值
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] =>Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult:Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value,taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
//获得Option的最后结果,或者当RDD为空时抛出异常
jobResult.getOrElse(throw newUnsupportedOperationException("empty collection"))
}
【例3-26】reduce方法应用样例
这个例子使用简单的函数将输入的元素相加,过程是先输入前两个元素相加,然后将得到的结果与下一个输入元素相加,依次规则计算出所有元素的和。
3.take
take方法会从RDD中取出前n[1]个元素。方法是先扫描一个分区并后从分区中得到结果,然后评估得到的结果是否达到取出元素个数,如果没达到则继续从其他分区中扫描获取。
方法源码实现:
def take(num: Int): Array[T] = {
if (num == 0) {
return new Array[T](0)
}
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num &&partsScanned < totalParts) {
// numPartsToTry表示在这个迭代中尝试的分区数,这个数可以比总分区数大,因为在runJob中的总分区会限定它的值。
var numPartsToTry = 1
if (partsScanned > 0) {
//如果没有在之前的迭代中找到记录,则会重复寻找(次数翻四倍),此外还会调整分
区数,最多调整涨幅不超过50%
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
// the left side of max is >=1whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num *partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry =Math.min(numPartsToTry, partsScanned * 4)
}
}
val left = num - buf.size
val p = partsScanned untilmath.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
res.foreach(buf ++= _.take(num -buf.size))
partsScanned += numPartsToTry
}
buf.toArray
}
【例3-27】take方法应用样例
这里例子分别演示了字母和数字情况,其实工作原理都相同,即从分区中按先后顺序拿元素出来。
4.top
top方法会利用隐式排序转换方法(见实现源码中implicit修饰的方法)来获取最大的前n个元素。
方法源码实现:
def top(num: Int)(implicit ord:Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
def takeOrdered(num:Int)(implicit ord: Ordering[T]): Array[T] = {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements,so let's reverse the ordering.
val queue = newBoundedPriorityQueue[T](num)(ord.reverse)
queue ++=util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.size == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}
【例3-28】top方法应用样例
例子显示了top的使用方法,很简洁,直接输入元素个数作为参数就能得到前n个元素的值。
5.count
count方法计算并返回RDD中元素的个数。
方法源码实现:
def count(): Long =sc.runJob(this, Utils.getIteratorSize _).sum
def runJob[T, U: ClassTag](rdd:RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size,false)
}
【例3-29】count方法应用样例
6.takeSample
takeSample方法返回一个固定大小的数组形式的采样子集,此外还把返回的元素顺序随机打乱,方法的三个参数含义依次是否放回数据、返回取样的大小和随机数生成器的种子。
方法源码实现:
def takeSample(withReplacement:Boolean,
num: Int,
seed: Long = Utils.random.nextLong):Array[T] = {
val numStDev = 10.0
if (num < 0) {
throw newIllegalArgumentException("Negative number of elements requested")
} else if (num == 0) {
return new Array[T](0)
}
val initialCount = this.count()
if (initialCount == 0) {
return new Array[T](0)
}
val maxSampleSize = Int.MaxValue -(numStDev * math.sqrt(Int.MaxValue)).toInt
if (num > maxSampleSize) {
throw new IllegalArgumentException("Cannotsupport a sample size > Int.MaxValue - " +
s"$numStDev *math.sqrt(Int.MaxValue)")
}
val rand = new Random(seed)
if (!withReplacement && num >=initialCount) {
returnUtils.randomizeInPlace(this.collect(), rand)
}
valfraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
withReplacement)
var samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()
//如果采样容量不够大,则继续采样
var numIters = 0
while (samples.length < num) {
logWarning(s"Needed to re-sample dueto insufficient sample size. Repeat #$numIters")
samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()
numIters += 1
}
Utils.randomizeInPlace(samples,rand).take(num)
}
【例3-30】takeSample方法应用样例
这个例子直接使用takeSample方法,得到30个固定数字的样本,采取有放回抽样的方式。
7.saveAsTextFile
把RDD存储为文本文件,一次存一行。
方法源码实现:
def saveAsTextFile(path:String) {
val nullWritableClassTag =implicitly[ClassTag[NullWritable]]
val textClassTag =implicitly[ClassTag[Text]]
val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))
rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
def saveAsTextFile(path:String, codec: Class[_ <: CompressionCodec]) { //参数可选择压缩方式
// 参考https://issues.apache.org/jira/browse/SPARK-2075
val nullWritableClassTag =implicitly[ClassTag[NullWritable]]
val textClassTag =implicitly[ClassTag[Text]]
val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))
rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}
【例3-31】saveAsTextFile方法应用样例
8.countByKey
类似count方法,不同的是countByKey方法会根据相同的Key计算其对应的Value个数,返回的是map类型的结果。
方法源码实现:
def countByKey(): Map[K, Long]= self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
【例3-32】countByKey方法应用样例
这个例子先构造键值对变量a,然后使用countByKey方法对相同Key的Value进行统计,过程是先调用mapValue方法把Value映射为1,再reduceByKey到Key和其对应Value的个数。
9.aggregate
aggregate方法先将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
aggregate有两个函数seqOp和combOp,这两个函数都是输入两个参数,输出一个参数,其中seqOp函数可以看成是reduce操作,combOp函数可以看成是第二个reduce操作(一般用于combine各分区结果到一个总体结果),由定义,combOp操作的输入和输出类型必须一致。
方法源码实现:
def aggregate[U:ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
//克隆zero值,因为这个值也会被序列化
var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T])=> it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult:U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition,mergeResult)
jobResult
}
【例3-33】aggregate方法应用样例
在spark中一个分区对应一个task,从源码来看,zeroValue参与每个分区的seqOp(reduce)方法和最后的combOp(第二个reduce)方法,先对每个分区求reduce,在该例子中是对3个分区分别求Max操作,得到分区最大值,得到的结果参与combOp方法,即把各分区的结果和zeroValue相加最后得到结果值,从前两个例子可以看出这个操作特点,体现先分后总的思想。
对于后面两个例子使用的是字符串,aggregate方法的思路一样,先对各分区求seqOp方法然后再使用combOp方法把各分区的结果聚合相加,得到最终结果。
10.fold
fold方法与aggregate方法原理类似,区别就是少了一个seqOp方法。fold方法是把每个分区的元素进行聚合,然后调用reduce(op)方法处理。
方法源码实现:
def fold(zeroValue: T)(op: (T,T) => T): T = {
//克隆zero值,因为这个值也会被序列化
var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T])=> iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult:T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
【例3-34】fold方法应用样例
这个例子中的使用方式与aggregate方法非常相似,注意zeroValue参与所有分区计算。fold计算是保证每个分区能独立计算,它与aggregate最大的区别是aggregate对不同分区提交的最终结果定义了一个专门的comOp函数来处理,而fold方法是采用一个方法来处理aggregate的两个方法过程。
3.6 本章小结
本章主要为读者讲述了Spark核心开发部分,其中讲述了SparkContext的作用与创建过程,还对RDD的概念模型进行介绍,说明了RDD的Transformation和Action操作的内涵意义。在基本介绍Spark编程模型后在实践环节列出了主要的Transformation和Action方法的使用范例,同时结合了方法源码说明范例计算过程。本章为Spark应用基础,第六章将继续集合源码深入介绍RDD的运行机制和Spark调度机制。下一章将逐一介绍Spark的四大编程模型,让读者进一步学习并掌握Spark在不同业务场景下的应用。
本教程源于2016年3月出版书籍《Spark原理、机制及应用》 ,在此以知识共享为初衷公开部分内容,如有兴趣,请支持正版书籍。
- 本文固定链接: http://qiantao.net.cn/?id=221
- 转载请注明: admin 于 千淘万漉 发表
《本文》有 0 条评论