我的Spark学习笔记( 三 )

coalesce算子:数据(shuffle可?。┲匦路智?/h2>import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 根据数据量缩减分区 , 用于大数据集过滤后,提高小数据集的执行效率 * 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法 , 收缩合并分区,减少分区的个数,减小任务调度成本 */object coalesce {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// 默认3个分区val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)// coalesce方法默认情况下不会将分区的数据打乱重新组合,默认shuffer=false// 这种情况下的缩减分区可能会导致数据不均衡 , 出现数据倾斜,如果想要让数据均衡,可以进行shuffle处理// 缩减成2个分区并shufferval newRDD: RDD[Int] = rdd.coalesce(2, true)newRDD.saveAsTextFile("output")sc.stop()}}repartition算子:数据shuffle重新分区import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true 。* 无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,* repartition操作都可以完成,因为无论如何都会经 shuffle 过程 。* 直接用repartition就行,coalesce就别用了 */object repartition {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)// coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义 , 不起作用 。// 所以如果想要实现扩大分区的效果,需要使用shuffle操作/*** 底层就是coalesce* def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {* coalesce(numPartitions, shuffle = true)* }*/// 缩减分区val newRDD1: RDD[Int] = rdd.repartition(2)// 扩大分区val newRDD2: RDD[Int] = rdd.repartition(4)rdd.saveAsTextFile("output0")newRDD1.saveAsTextFile("output1")newRDD2.saveAsTextFile("output2")sc.stop()}}sortBy算子:数据排序import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 该操作用于排序数据 。在排序之前 , 可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列 。* 排序后新产生的 RDD 的分区数与原 RDD 的分区数一致 。中间存在shuffle的过程 。*/object sortBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// 例子1val rdd = sc.makeRDD(List(6, 2, 4, 5, 3, 1), 2)val newRDD: RDD[Int] = rdd.sortBy(n => n)println(newRDD.collect().mkString(","))newRDD.saveAsTextFile("output")// 例子2val rdd2 = sc.makeRDD(List(("1", 1), ("3", 2), ("2", 3)), 2)// sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式// sortBy默认情况下,不会改变分区 。但是中间存在shuffle操作val newRDD1 = rdd2.sortBy(t => t._1.toInt, false) // 降序val newRDD2 = rdd2.sortBy(t => t._1.toInt, true) // 升序newRDD1.collect().foreach(println)newRDD2.collect().foreach(println)sc.stop()}}intersection union subtract zip:两个数据源 交 并 差 拉链/** * 两个数据源 交 并 差 拉链 */object intersection_union_subtract_zip {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)// 交集,并集和差集要求两个数据源数据类型保持一致val rdd1 = sc.makeRDD(List(1, 2, 3, 4))val rdd2 = sc.makeRDD(List(3, 4, 5, 6))// 交集 : 【3,4】val rdd3: RDD[Int] = rdd1.intersection(rdd2)println(rdd3.collect().mkString(","))// 并集 : 【1 , 2,3,4 , 3 , 4,5,6】val rdd4: RDD[Int] = rdd1.union(rdd2)println(rdd4.collect().mkString(","))// 差集 : 【1,2】val rdd5: RDD[Int] = rdd1.subtract(rdd2)println(rdd5.collect().mkString(","))// 拉链 : 【1-3 , 2-4 , 3-5,4-6】val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)println(rdd6.collect().mkString(","))// 拉链操作两个数据源的类型可以不一致,但要求分区中数据数量保持一致val rdd7 = sc.makeRDD(List("a", "b", "c", "d"))val rdd8 = rdd1.zip(rdd7)println(rdd8.collect().mkString(","))sc.stop()}}partitionBy算子:数据按照指定规则重新进行分区【我的Spark学习笔记】import org.apache.spark.rdd.RDDimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/** * partitionBy:数据按照指定规则重新进行分区 。Spark 默认的分区器是 HashPartitioner * repartition coalesce:将分区增加或缩?。?数据是无规则的 */object partitionBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// PairRDDFunctions才支持partitionBy,所以需要先转换成mapRDDval mapRDD: RDD[(Int, Int)] = rdd.map(num => (num, 1))// partitionBy根据指定的分区规则对数据进行重分区val newRDD = mapRDD.partitionBy(new HashPartitioner(2))newRDD.partitionBy(new HashPartitioner(2))newRDD.saveAsTextFile("output")sc.stop()}}

推荐阅读