首页 > 大数据 > spark入门教程(3)--Spark 核心API开发
2017
02-17

spark入门教程(3)--Spark 核心API开发

千淘万漉博客阿里云大使推广链接

 

3.5.2 键值对型Transformation算子


RDD的操作算子除了单值型还有键值对(Key-Value)型。这里开始介绍键值对型的算子,主要包括groupByKeycombineByKeyreduceByKeysortByKeycogroupjoin,如表3-5所示。

3-5 键值对型Transformation算子

方法名

方法定义

groupByKey

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

combineByKey

def combineByKey[C](createCombiner: V => C, mergeValue: (C,V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)]

reduceByKey

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

sortByKey

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P]

cogroup

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD

join

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]


1groupByKey

类似groupBy方法,作用是把每一个相同Key值的的Value聚集起来形成一个序列,可以使用默认分区器和自定义分区器,但是这个方法开销比较大,如果想对同一Key进行Value的聚合或求平均,则推荐使用aggregateByKey或者reduceByKey

方法源码实现:

def groupByKey(numPartitions:Int): RDD[(K, Iterable[V])] = {

    groupByKey(new HashPartitioner(numPartitions))

}

def groupByKey(partitioner:Partitioner): RDD[(K, Iterable[V])] = {

//groupByKey不应该使用map端的combine操作,因为map端并不会减少shuffle的数据,还要求

//所有map端的数据都插入hash表中,导致很多对象进入内存中的老年代。

    val createCombiner = (v: V) =>CompactBuffer(v)

    val mergeValue = (buf: CompactBuffer[V], v:V) => buf += v

    val mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2

    val bufs = combineByKey[CompactBuffer[V]](

      createCombiner, mergeValue,mergeCombiners, partitioner, mapSideCombine=false)

    bufs.asInstanceOf[RDD[(K, Iterable[V])]]

}

【例3-19groupByKey方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(List("mk""zq""xwc""fjg""dcp""snn"), 2)  

  2. val b = a.keyBy(x => x.length)       // keyBy方法调用map(x => (f(x),x))生成键值对  

  3. b.groupByKey.collect  

  4. res6: Array[(Int, Iterable[String])] = Array((2,CompactBuffer(mk, zq)), (3,CompactBuffer(xwc, fjg, dcp, snn)))  

这个例子先创建包含List集合对象的RDD,然后使用keyBy方法生成Key-Value键值对,然后调用groupByKey方法将相同KeyValue聚合,最后调用collect方法以数组形式输出。



3-7  groupByKey方法应用样例

2.combineByKey

combineByKey方法能高效的将键值对形式的RDD按相同的KeyValue合并成序列形式,用户能自定义RDD的分区器和是否在map端进行聚合操作。

方法源码实现:

defcombineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {

  combineByKey(createCombiner, mergeValue,mergeCombiners, defaultPartitioner(self))

}

defcombineByKey[C](createCombiner: V => C,

mergeValue: (C, V) => C,        //输入2个不同类型参数,返回其中一个类型参数

   mergeCombiners: (C, C) => C,  //输入2个同类型参数,返回一个参数

   numPartitions: Int): RDD[(K, C)] = {

     combineByKey(createCombiner,mergeValue, mergeCombiners, new HashPartitioner(numPartitions))

}

def combineByKey[C](createCombiner:V => C,

   mergeValue: (C, V) => C,

   mergeCombiners: (C, C) => C,

   partitioner: Partitioner,

   mapSideCombine: Boolean = true,

   serializer: Serializer = null): RDD[(K, C)]= {

     require(mergeCombiners!= null, "mergeCombiners must be defined") // required as of Spark0.9.0

     if(keyClass.isArray) {

       if(mapSideCombine) {

          throw new SparkException("Cannot usemap-side combining with array keys.")

      }

      if(partitioner.isInstanceOf[HashPartitioner]) {

        throw new SparkException("Defaultpartitioner cannot partition array keys.")

      }

    }

    val aggregator = new Aggregator[K, V, C](

      self.context.clean(createCombiner),

      self.context.clean(mergeValue),

      self.context.clean(mergeCombiners))

    if (self.partitioner == Some(partitioner)){

      self.mapPartitions(iter => {

        val context = TaskContext.get()

        new InterruptibleIterator(context,aggregator.combineValuesByKey(iter, context))

      }, preservesPartitioning = true)

    } else {

      new ShuffledRDD[K, V, C](self,partitioner)

        .setSerializer(serializer)

        .setAggregator(aggregator)

        .setMapSideCombine(mapSideCombine)

    }

  }

        

【例3-20combineByKey方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(List("xwc""fjg""wc""dcp""zq""snn""mk""zl""hk""lp"), 2)  

  2. val b = sc.parallelize(List(1,2,2,3,2,1,2,2,2,3), 2)  

  3. val c = b.zip(a)    //把a和b中对应元素组合成键值对,如Array((1,xwc), (3,fjg), (2,wc), (3,dcp)...   

  4. val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String] = x ::: y)  

  5. d.collect  

  6. res13: Array[(Int, List[String])] = Array((2,List(zq, wc, fjg, hk, zl, mk)), (1,List(xwc, snn)), (3,List(dcp, lp)))  


在使用zip方法得到键值对序列c后调用combineByKey,把相同Keyvalue进行合并到List中。这个例子中使用三个参数的重载方法,该方法第一个参数是createCombiner,作用是把元素V转换到另一类型元素C,该例子中使用的是List(_),表示将输入的元素放在list集合中;mergeValue的含义是把元素V合并到元素C中,在该例子中使用的是函数是x:List[String],y:String) => y :: x,表示将y字符串合并到x链表集合中;mergeCombiners含义是将两个C元素合并,在该例子中使用的是x:List[String], y:List[String]= x ::: y,表示把x链表集合中的内容合并到y链表集合中。

3reduceByKey

使用一个reduce函数来实现对相同KeyValue的聚集操作,在发送结果给reduce前会在map端的执行本地merge操作。该方法的底层实现就是调用combineByKey方法的一个重载方法。

方法源码实现:

def reduceByKey(partitioner:Partitioner, func: (V, V) => V): RDD[(K, V)] = {

    combineByKey[V]((v: V) => v, func, func,partitioner)

}

def reduceByKey(func: (V, V)=> V, numPartitions: Int): RDD[(K, V)] = {

    reduceByKey(newHashPartitioner(numPartitions), func)

}

def reduceByKey(func: (V, V)=> V): RDD[(K, V)] = {

    reduceByKey(defaultPartitioner(self), func)

}

【例3-21reduceByKey方法应用样例

[java] view plain copy
  1. 1)val a = sc.parallelize(List("dcp""fjg""snn""wc""zq"), 2)  

  2. val b = a.map(x => (x.length, x))  

  3. b.reduceByKey((a,b) => a + b).collect  

  4. res22: Array[(Int, String)] = Array((2,wczq), (3,dcpfjgsnn))  

  5. 2)val a = sc.parallelize(List(3,12,124,32,5 ), 2)  

  6. val b = a.map(x => (x.toString.length, x))  

  7. b.reduceByKey(_ + _).collect  

  8. res24: Array[(Int, Int)] = Array((2,44), (1,8), (3,124))  


这个例子先用map方法映射出键值对,然后调用reduceByKey方法对相同KeyValue值进行累加。例子中第一个是使用字符串,故使用聚合相加后是字符串的合并;第二个例子使用的是数字,结果是对应KeyValue数字相加。

4sortByKey

这个函数会根据Key值对键值对进行排序,如果Key是字母,则按字典顺序排序,如果Key是数字,则从小到大排序(或从大到小),该方法的第一个参数控制是否为升序排序,当为true时是升序,反之为降序。

方法源码实现:

 def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.size) : RDD[(K, V)] =

 {

    val part = new RangePartitioner(numPartitions,self, ascending)

    new ShuffledRDD[K, V, V](self, part)

      .setKeyOrdering(if (ascending) orderingelse ordering.reverse)

 }

【例3-22sortByKey方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(List("dog""cat""owl""gnu""ant"), 2)  

  2. val b = sc.parallelize(1 to a.count.toInt, 2)       //a.count得到单词的字母个数  

  3. val c = a.zip(b)  

  4. c.sortByKey(true).collect  

  5. res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))  

  6. c.sortByKey(false).collect  

  7. res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))  


这个例子先通过zip方法得到包含键值对的变量c,然后演示了sortByKey方法中参数为truefalse时的计算结果。本例中的key是字符串,故可以看出当Keytrue时,结果是按Key的字典顺序升序输出,反之则为降序输出结果;当key为数字的时候,则按大小排列。

5.cogroup

cogroup是一个比较高效的函数,能根据Key值聚集最多3个键值对的RDD,把相同Key值对应的Value聚集起来。

方法源码实现:

//参数为一个RDD情况

def cogroup[W](other: RDD[(K,W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] = {

    if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {

      throw new SparkException("Defaultpartitioner cannot partition array keys.")

    }

    val cg = new CoGroupedRDD[K](Seq(self,other), partitioner)

    cg.mapValues { case Array(vs, w1s) =>

      (vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W]])

    }

}

//参数为两个RDD情况

def cogroup[W1, W2](other1:RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K,(Iterable[V], Iterable[W1], Iterable[W2]))] = {

    if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {

      throw new SparkException("Defaultpartitioner cannot partition array keys.")

    }

    val cg = new CoGroupedRDD[K](Seq(self,other1, other2), partitioner)

    cg.mapValues { case Array(vs, w1s, w2s)=>(vs.asInstanceOf[Iterable[V]],

        w1s.asInstanceOf[Iterable[W1]],

        w2s.asInstanceOf[Iterable[W2]])

    }

  }

//参数为3个RDD情况

def cogroup[W1, W2, W3](other1:RDD[(K, W1)],

      other2: RDD[(K, W2)],

      other3: RDD[(K, W3)],

      partitioner: Partitioner)

      : RDD[(K, (Iterable[V], Iterable[W1],Iterable[W2], Iterable[W3]))] = {

    if (partitioner.isInstanceOf[HashPartitioner]&& keyClass.isArray) {

      throw new SparkException("Defaultpartitioner cannot partition array keys.")

    }

    val cg = new CoGroupedRDD[K](Seq(self,other1, other2, other3), partitioner)

    cg.mapValues { case Array(vs, w1s, w2s, w3s)=>

       (vs.asInstanceOf[Iterable[V]],

         w1s.asInstanceOf[Iterable[W1]],

         w2s.asInstanceOf[Iterable[W2]],

         w3s.asInstanceOf[Iterable[W3]])

    }

  }

【例3-23cogroup方法应用样例

[java] view plain copy
  1. 1)val a =sc.parallelize(List(1,2,2 ,313), 1)  

  2. val b =a.map(x => (x, "b"))  

  3. val c =a.map(y => (y, "c"))  

  4. b.cogroup(c).collect  

  5. res25:Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b,b),  

  6. CompactBuffer(c,c))), (3,(CompactBuffer(b, b),CompactBuffer(c, c))), (2,(CompactBuffer(b,                             b),CompactBuffer(c, c))))  

  7. 2)val d = a.map(m => (m,"x"))  

  8. b.cogroup(c,d).collect  

  9. res26:Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =Array((1,(CompactBuffer(b,b)  

  10. ,CompactBuffer(c,c),CompactBuffer(x, x))), (3,(CompactBuffer(b, b),CompactBuffer(c, c),  

  11. CompactBuffer(x,x))), (2,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(x, x))))  


 

例子中有两个个小例子,依次是单个参数和两个参数的情况,使用cogroup方法对单个RDD2RDD进行聚集操作。

6.join

对键值对的RDD进行cogroup操作,然后对每个新的RDDKey的值进行笛卡尔积操作,再对返回结果使用flatMapValues方法,最后返回结果。

方法源码实现:

def join[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, W))] = {

    this.cogroup(other,partitioner).flatMapValues( pair =>

      for (v <- pair._1.iterator; w <-pair._2.iterator) yield (v, w)

    )

}

【例3-24join方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(List("fjg""wc""xwc","dcp"), 2)  

  2. val b = a.keyBy(_.length)       //得到诸如(3,"fjg"),(2,"wc")的键值对序列  

  3. val c = sc.parallelize(List("fjg""wc""snn""zq""xwc","dcp"), 2)  

  4. val d = c.keyBy(_.length)  

  5. b.join(d).collect  

  6. res29: Array[(Int, (String, String))] = Array((2,(wc,wc)), (2,(wc,zq)), (3,(fjg,fjg)), (3,(fjg,snn)), (3,(fjg,xwc)), (3,(fjg,dcp)), (3,(xwc,fjg)), (3,(xwc,snn)), (3,(xwc,xwc)), (3,(xwc,dcp)), (3,(dcp,fjg)), (3,(dcp,snn)), (3,(dcp,xwc)), (3,(dcp,dcp)))  


这个例子先构造两个包含键值对元素的变量bd,然后调用join方法,得到join后的结果,根据源码实现,join方法本质是cogroup方法和flatMapValues方法的组合,其中cogroup方法得到聚合值,flatMapValues方法实现的是笛卡尔积,笛卡尔积的过程是在各个分区内进行,如例子中的Key等于2分区,wc与(wczq)求笛卡尔积,得到(2,wc,wc))和(2,(wc,zq))的结果。



 

3-8  join方法应用样例

3.5.3 Action算子

Spark的计算模型中出现Action算子时才会执行提交作业的runJob动作,这时会触发后续的DAGSchedulerTaskScheduler工作。这里主要讲解常用的Action算子,有collectreducetaketopcounttakeSamplesaveAsTextFilecountByKeyaggregate,具体方法和定义如表3-6所示。

3-6  Action算子

方法名

方法定义

collect

def collect(): Array[T]

reduce

def reduce(f: (T, T) => T): T

take

def take(num: Int): Array[T]

top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

count

def count(): Long

takeSample

def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T]

saveAsTextFile

def saveAsTextFile(path: String)

countByKey

def countByKey(): Map[K, Long]

aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

1collect

collect方法的作用是把RDD中的元素以数组的方式返回。

方法源码实现:

def collect(): Array[T] = {

    val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)

    Array.concat(results: _*)

}

【例3-25collect方法应用样例

[java] view plain copy
  1. val c = sc.parallelize(List("a""b""c""d""e""f"), 2)  

  2. c.collect  

  3. res29: Array[String] = Array(a, b, c, d, e, f)  


这个例子直接把RDD中的元素转换成数组返回。

2reduce

reduce方法使用一个带两个参数的函数把元素进行聚集,返回一个元素结果,注意该函数中的二元操作应该满足交换律和结合律,这样才能在并行系统中正确计算。

方法源码实现:

 def reduce(f: (T, T) => T): T = {            //输入是两个参数的函数,返回一个值

    val cleanF = sc.clean(f)

    val reducePartition: Iterator[T] =>Option[T] = iter => {

      if (iter.hasNext) {

        Some(iter.reduceLeft(cleanF))

      } else {

        None

      }

    }

    var jobResult: Option[T] = None

    val mergeResult = (index: Int, taskResult:Option[T]) => {

      if (taskResult.isDefined) {

        jobResult = jobResult match {

          case Some(value) => Some(f(value,taskResult.get))

          case None => taskResult

        }

      }

    }

    sc.runJob(this, reducePartition, mergeResult)

//获得Option的最后结果,或者当RDD为空时抛出异常

    jobResult.getOrElse(throw newUnsupportedOperationException("empty collection"))

  }

【例3-26reduce方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(1 to 10)  

  2. a.reduce((a,b)=> a + b)  

  3. res41: Int = 55  

 

这个例子使用简单的函数将输入的元素相加,过程是先输入前两个元素相加,然后将得到的结果与下一个输入元素相加,依次规则计算出所有元素的和。

3take

take方法会从RDD中取出前n[1]个元素。方法是先扫描一个分区并后从分区中得到结果,然后评估得到的结果是否达到取出元素个数,如果没达到则继续从其他分区中扫描获取。

方法源码实现:

 def take(num: Int): Array[T] = {

    if (num == 0) {

      return new Array[T](0)

    }

    val buf = new ArrayBuffer[T]

    val totalParts = this.partitions.length

    var partsScanned = 0

    while (buf.size < num &&partsScanned < totalParts) {

// numPartsToTry表示在这个迭代中尝试的分区数,这个数可以比总分区数大,因为在runJob中的总分区会限定它的值。

      var numPartsToTry = 1

      if (partsScanned > 0) {

//如果没有在之前的迭代中找到记录,则会重复寻找(次数翻四倍),此外还会调整分

 区数,最多调整涨幅不超过50%

        if (buf.size == 0) {

          numPartsToTry = partsScanned * 4

        } else {

          // the left side of max is >=1whenever partsScanned >= 2

          numPartsToTry = Math.max((1.5 * num *partsScanned / buf.size).toInt - partsScanned, 1)

          numPartsToTry =Math.min(numPartsToTry, partsScanned * 4)

        }

      }

      val left = num - buf.size

      val p = partsScanned untilmath.min(partsScanned + numPartsToTry, totalParts)

      val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal = true)

      res.foreach(buf ++= _.take(num -buf.size))

      partsScanned += numPartsToTry

    }

    buf.toArray

  }

【例3-27take方法应用样例

[java] view plain copy
  1. (1) val b = sc.parallelize(List("a""b""c""d""e"), 2)  

  2.    b.take(2)  

  3.    res18: Array[String] = Array(a, b)  

  4. (2) val b = sc.parallelize(1 to 1005)  

  5.    b.take(30)  

  6.    res6: Array[Int] = Array(123456789101112131415161718192021,222324252627282930)  


这里例子分别演示了字母和数字情况,其实工作原理都相同,即从分区中按先后顺序拿元素出来。

4.top

top方法会利用隐式排序转换方法(见实现源码中implicit修饰的方法)来获取最大的前n个元素。

方法源码实现:

def top(num: Int)(implicit ord:Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

def takeOrdered(num:Int)(implicit ord: Ordering[T]): Array[T] = {

    if (num == 0) {

      Array.empty

    } else {

      val mapRDDs = mapPartitions { items =>

        // Priority keeps the largest elements,so let's reverse the ordering.

        val queue = newBoundedPriorityQueue[T](num)(ord.reverse)

        queue ++=util.collection.Utils.takeOrdered(items, num)(ord)

        Iterator.single(queue)

      }

      if (mapRDDs.partitions.size == 0) {

        Array.empty

      } else {

        mapRDDs.reduce { (queue1, queue2) =>

          queue1 ++= queue2

          queue1

        }.toArray.sorted(ord)      

      }

    }

  }

【例3-28top方法应用样例

[java] view plain copy
  1. val c = sc.parallelize(Array(132,492,11,5), 3)  

  2. c.top(3)  

  3. res10:Array[Int] = Array(1195)  

例子显示了top的使用方法,很简洁,直接输入元素个数作为参数就能得到前n个元素的值。

5.count

count方法计算并返回RDD中元素的个数。

方法源码实现:

def count(): Long =sc.runJob(this, Utils.getIteratorSize _).sum

def runJob[T, U: ClassTag](rdd:RDD[T], func: Iterator[T] => U): Array[U] = {

    runJob(rdd, func, 0 until rdd.partitions.size,false)

}

【例3-29count方法应用样例

[java] view plain copy
  1. val c = sc.parallelize(Array(1,32,492,11,5), 2)  

  2. c.count  

  3. res3: Long = 8  


6.takeSample

takeSample方法返回一个固定大小的数组形式的采样子集,此外还把返回的元素顺序随机打乱,方法的三个参数含义依次是否放回数据、返回取样的大小和随机数生成器的种子。

方法源码实现:

def takeSample(withReplacement:Boolean,

     num: Int,

      seed: Long = Utils.random.nextLong):Array[T] = {

    val numStDev =  10.0

    if (num < 0) {

      throw newIllegalArgumentException("Negative number of elements requested")

    } else if (num == 0) {

      return new Array[T](0)

    }

    val initialCount = this.count()

    if (initialCount == 0) {

      return new Array[T](0)

    }

    val maxSampleSize = Int.MaxValue -(numStDev * math.sqrt(Int.MaxValue)).toInt

    if (num > maxSampleSize) {

      throw new IllegalArgumentException("Cannotsupport a sample size > Int.MaxValue - " +

        s"$numStDev *math.sqrt(Int.MaxValue)")

    }

    val rand = new Random(seed)

    if (!withReplacement && num >=initialCount) {

      returnUtils.randomizeInPlace(this.collect(), rand)

    }

    valfraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,

      withReplacement)

    var samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()

//如果采样容量不够大,则继续采样

    var numIters = 0

    while (samples.length < num) {

      logWarning(s"Needed to re-sample dueto insufficient sample size. Repeat #$numIters")

      samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()

      numIters += 1

    }

    Utils.randomizeInPlace(samples,rand).take(num)

  }

【例3-30takeSample方法应用样例

[java] view plain copy
  1. val x = sc.parallelize(1 to 1002)  

  2. x.takeSample(true301)  

  3. res13: Array[Int] = Array(723796474096571008448211324799943797,5241100789311610075144716)  


这个例子直接使用takeSample方法,得到30个固定数字的样本,采取有放回抽样的方式。

7.saveAsTextFile

RDD存储为文本文件,一次存一行。

方法源码实现:

def saveAsTextFile(path:String) {          

    val nullWritableClassTag =implicitly[ClassTag[NullWritable]]

    val textClassTag =implicitly[ClassTag[Text]]

    val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))

   rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

     .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

}

def saveAsTextFile(path:String, codec: Class[_ <: CompressionCodec]) {     //参数可选择压缩方式

    // 参考https://issues.apache.org/jira/browse/SPARK-2075

    val nullWritableClassTag =implicitly[ClassTag[NullWritable]]

    val textClassTag =implicitly[ClassTag[Text]]

    val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))

   rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

     .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)

}

【例3-31saveAsTextFile方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(1 to 1003)  

  2. a.saveAsTextFile("BIT_Spark")  

  3. //控制台打印出的部分日志  

  4. 15/08/04 10:27:58 INFO FileOutputCommitter: Saved output of task 'attempt_201508041027_0001       _m_000002_5' to file:/home/hadoop/spark/bin/BIT_Spark   

  5. //在当前路径下可以看到输出3个文件part-***,原因是RDD有3个分区,每个分区默认输出一个文件,SUCCESS文件执行表示成功。  

  6. hadoop@master:~/spark/bin/BIT_Spark$ ls  

  7. part-00000  part-00001  part-00002  _SUCCESS  

  8. hadoop@master:~/spark/bin/BIT_Spark$ vim part-00000     //查看第一个分区文件的内容  

  9. 1  

  10. 2  

  11. 3  

  12. 4  

  13. 5  


8.countByKey

类似count方法,不同的是countByKey方法会根据相同的Key计算其对应的Value个数,返回的是map类型的结果。

方法源码实现:

def countByKey(): Map[K, Long]= self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap

【例3-32countByKey方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(List((1"bit"), (2"xwc"), (2"fjg"), (3"wc"),(3"wc"),(3"wc")), 2)  

  2. a.countByKey  

  3. res3: scala.collection.Map[Int,Long] = Map(1 -> 12 -> 2,3 -> 3)  


这个例子先构造键值对变量a,然后使用countByKey方法对相同KeyValue进行统计,过程是先调用mapValue方法把Value映射为1,再reduceByKeyKey和其对应Value的个数。

9.aggregate

aggregate方法先将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

aggregate有两个函数seqOpcombOp,这两个函数都是输入两个参数,输出一个参数,其中seqOp函数可以看成是reduce操作,combOp函数可以看成是第二个reduce操作(一般用于combine各分区结果到一个总体结果),由定义,combOp操作的输入和输出类型必须一致。

方法源码实现:

def aggregate[U:ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {

//克隆zero值,因为这个值也会被序列化

    var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())        

    val cleanSeqOp = sc.clean(seqOp)

    val cleanCombOp = sc.clean(combOp)

    val aggregatePartition = (it: Iterator[T])=> it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

    val mergeResult = (index: Int, taskResult:U) => jobResult = combOp(jobResult, taskResult)

    sc.runJob(this, aggregatePartition,mergeResult)

    jobResult

}

【例3-33aggregate方法应用样例

[java] view plain copy
  1. // 分区0的reduce操作是max(0, 2,3) = 3  

  2. // 分区1的reduce操作是max(0, 4,5) = 5  

  3. // 分区2的reduce操作是max(0, 6,7) = 7  

  4. // 最后的combine操作是0 + 3 + 5 + 7 = 15  

  5. //注意最后的reduce操作包含初始值  

  6. 1)val z = sc.parallelize(List(2,3,4,5,6,7), 3)  

  7.  z.aggregate(0)((a,b) => math.max(a, b), (c,d) => c + d )  

  8.  res6: Int = 15  

  9. // 分区0的reduce操作是max(3, 2,3) = 3  

  10. // 分区1的reduce操作是max(3, 4,5) = 5  

  11. // 分区2的reduce操作是max(3, 6,7) = 7  

  12. // 最后的combine操作是3 + 3 + 5 + 7 = 18  

[java] view plain copy
  1. 2)val z = sc.parallelize(List(2,3,4,5,6,7), 3)  

  2.  z.aggregate(3)((a,b) => math.max(a, b), (c,d) => c + d )  

  3.  res7: Int = 18  

[java] view plain copy
  1. 3)val z = sc.parallelize(List("a","b","c","d","e","f"),2)  

  2.  z.aggregate("")(_ + _, _+_)  

  3.  res8: String = defabc  

[java] view plain copy
  1. 4)val z = sc.parallelize(List("a","b","c","d","e","f"),2)  

  2.  z.aggregate("x")(_ + _, _+_)  

  3.  res9: String = xxdefxabc  


spark中一个分区对应一个task,从源码来看,zeroValue参与每个分区的seqOpreduce方法和最后的combOp(第二个reduce)方法,先对每个分区求reduce,在该例子中是对3个分区分别求Max操作,得到分区最大值,得到的结果参与combOp方法,即把各分区的结果和zeroValue相加最后得到结果值,从前两个例子可以看出这个操作特点,体现先分后总的思想。

对于后面两个例子使用的是字符串,aggregate方法的思路一样,先对各分区求seqOp方法然后再使用combOp方法把各分区的结果聚合相加,得到最终结果。

10fold

fold方法与aggregate方法原理类似,区别就是少了一个seqOp方法。fold方法是把每个分区的元素进行聚合,然后调用reduceop)方法处理。

方法源码实现:

def fold(zeroValue: T)(op: (T,T) => T): T = {

    //克隆zero值,因为这个值也会被序列化

    var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())

    val cleanOp = sc.clean(op)

    val foldPartition = (iter: Iterator[T])=> iter.fold(zeroValue)(cleanOp)

    val mergeResult = (index: Int, taskResult:T) => jobResult = op(jobResult, taskResult)

    sc.runJob(this, foldPartition, mergeResult)

    jobResult

}

【例3-34fold方法应用样例

[java] view plain copy
  1. // 分区0的reduce操作是0 + 1 + 2 + 3 = 6  

  2. // 分区1的reduce操作是0 + 4 + 5 + 6 = 15  

  3. // 分区2的reduce操作是0 + 7 + 8 + 9 = 24  

  4. // 最后的combine操作是0 + 6 + 15 + 24 = 45  

  5. 1)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)  

  6.      a.fold(0)(_ + _)  

  7.     res11:: Int = 45  

  8. // 分区0的reduce操作是1 + 1 + 2 + 3 = 7  

  9. // 分区1的reduce操作是1 + 4 + 5 + 6 = 16  

  10. // 分区2的reduce操作是1 + 7 + 8 + 9 = 25  

  11. // 最后的combine操作是1 + 7 + 16 + 25 = 53  

  12. 2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)  

  13.      a.fold(1)(_ + _)  

  14.      res12: Int = 53  


        这个例子中的使用方式与aggregate方法非常相似,注意zeroValue参与所有分区计算。fold计算是保证每个分区能独立计算,它与aggregate最大的区别是aggregate对不同分区提交的最终结果定义了一个专门的comOp函数来处理,而fold方法是采用一个方法来处理aggregate的两个方法过程。


3.6 本章小结

 

        本章主要为读者讲述了Spark核心开发部分,其中讲述了SparkContext的作用与创建过程,还对RDD的概念模型进行介绍,说明了RDD的Transformation和Action操作的内涵意义。在基本介绍Spark编程模型后在实践环节列出了主要的Transformation和Action方法的使用范例,同时结合了方法源码说明范例计算过程。本章为Spark应用基础,第六章将继续集合源码深入介绍RDD的运行机制和Spark调度机制。下一章将逐一介绍Spark的四大编程模型,让读者进一步学习并掌握Spark在不同业务场景下的应用。


 

本教程源于2016年3月出版书籍《Spark原理、机制及应用》 ,在此以知识共享为初衷公开部分内容,如有兴趣,请支持正版书籍。

本文》有 0 条评论

留下一个回复