我的Spark学习笔记( 四 )

reduceByKey算子:按相同key聚合import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 可以将数据按照相同的 Key 对 Value 进行聚合 */object reduceByKey {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))// reduceByKey : 相同的key的数据进行value数据的聚合操作// scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合// reduceByKey中如果key的数据只有一个,是不会参与运算的 。val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x: Int, y: Int) => {println(s"x = ${x}, y = ${y}")x + y})reduceRDD.collect().foreach(println)sc.stop()}}groupByKey算子:根据key对数据分组import org.apache.spark.rdd.RDDimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/** * 将数据源的数据根据 key 对 value 进行分组 * * * reduceByKey 和 groupByKey的区别? * * 从 shuffle 的角度: reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey * 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量 。* 而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高 。* * 从功能的角度: reduceByKey 其实包含分组和聚合的功能 。groupByKey 只能分组,不能聚合 。* 所以在分组聚合的场合下 , 推荐使用 reduceByKey 。如果仅仅是分组而不需要聚合,那么还是只能使用 groupByKey 。*/object groupByKey {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))// groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组//元组中的第一个元素就是key,//元组中的第二个元素就是相同key的value的集合val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()groupRDD.collect().foreach(println)val groupRDD2: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)groupRDD2.collect().foreach(println)val groupRDD3 = rdd.groupByKey(2)val groupRDD4 = rdd.groupByKey(new HashPartitioner(2))sc.stop()}}aggregateByKey算子:将数据根据不同的规则进行分区内计算和分区间计算import org.apache.spark.{SparkConf, SparkContext}/** * 将数据根据不同的规则进行分区内计算和分区间计算 * */object aggregateByKey {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), 2)// aggregateByKey存在函数柯里化,有两个参数列表// 第一个参数列表,需要传递一个参数,表示为初始值//主要用于当碰见第一个key的时候 , 和value进行分区内计算// 第二个参数列表需要传递2个参数//第一个参数表示分区内计算规则//第二个参数表示分区间计算规则//取出每个分区内相同key的最大值 然后分区间相加rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y).collect.foreach(println)sc.stop()}}foldByKey算子:和aggregateByKey类似import org.apache.spark.{SparkConf, SparkContext}/** * 当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey */object foldByKey {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),("b", 4), ("b", 5), ("a", 6)), 2)// rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)// 如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法 , 用下面的替换上面的rdd.foldByKey(0)(_ + _).collect.foreach(println)sc.stop()}}combineByKey算子:和aggregateByKey类似import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function) 。* 类似于aggregate() ,  combineByKey()允许用户返回值的类型与输入不一致 。*/object combineByKey {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),("b", 4), ("b", 5), ("a", 6)), 2)// combineByKey : 方法需要三个参数// 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作// 第二个参数表示:分区内的计算规则// 第三个参数表示:分区间的计算规则val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(v => (v, 1),(t: (Int, Int), v) => {(t._1 + v, t._2 + 1)},(t1: (Int, Int), t2: (Int, Int)) => {(t1._1 + t2._1, t1._2 + t2._2)})val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, cnt) => {num / cnt}}resultRDD.collect().foreach(println)sc.stop()}}

推荐阅读