我的Spark学习笔记( 五 )

reduceByKey、 foldByKey、 aggregateByKey、 combineByKey 的区别reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同foldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同aggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同combineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构,分区内和分区间计算规则不相同join算子:相同key连接import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD */object join {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd1 = sc.makeRDD(List(("a", 1), ("a", 2), ("c", 3), ("b", 3)))val rdd2 = sc.makeRDD(List(("a", 5), ("c", 6), ("a", 4)))// join : 两个不同数据源的数据,相同的key的value会连接在一起 , 形成元组//如果两个数据源中key没有匹配上,那么数据不会出现在结果中//如果两个数据源中key有多个相同的 , 会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低 。val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)joinRDD.collect().foreach(println)sc.stop()}}leftOuterJoin rightOuterJoin:左外连接 右外连接import org.apache.spark.{SparkConf, SparkContext}/** * 左外连接 右外连接 */object leftOuterJoin_rightOuterJoin {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2) //, ("c", 3)))val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))val leftJoinRDD = rdd1.leftOuterJoin(rdd2)val rightJoinRDD = rdd1.rightOuterJoin(rdd2)leftJoinRDD.collect().foreach(println)rightJoinRDD.collect().foreach(println)sc.stop()}}cogroup算子:分组 连接import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}/** * 分组 连接 */object cogroup {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2) //, ("c", 3)))val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6), ("c", 7)))// cogroup : connect + group (分组 , 连接)val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)cgRDD.collect().foreach(println)sc.stop()}}参考资料:Spark中文文档尚硅谷Spark教程
调度系统:如何把握分布式计算的精髓

推荐阅读