弹性分布式数据集 RDD及常用算子( 三 )

aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合def main(args: Array[String]): Unit = {/*** aggregateByKey:转换算子,可以实现将多个聚合方式放在一起实现,并且也能对Map进行预聚合* 可以弥补reduceByKey无法实现avg操作**/val conf: SparkConf = new SparkConf()conf.setAppName("Demo13aggregateByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")val ageKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), s.split(",")(2).toInt))val clazzCntKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))// 统计每个班级年龄之和val ageSumRDD: RDD[(String, Int)] = ageKVRDD.reduceByKey(_ + _)// 统计每个班级人数val clazzCntRDD: RDD[(String, Int)] = clazzCntKVRDD.reduceByKey(_ + _)// 统计每个班级的平均年龄ageSumRDD.join(clazzCntRDD).map {case (clazz: String, (ageSum: Int, cnt: Int)) =>(clazz, ageSum.toDouble / cnt)}.foreach(println)/*** zeroValue:初始化的值,类型自定义,可以是数据容器* seqOp:在组内(每个分区内部即每个Map任务)进行的操作 , 相当是Map端的预聚合操作* combOp:在组之间(每个Reduce任务之间)进行的操作,相当于就是最终每个Reduce的操作*/// 使用aggregateByKey统计班级年龄之和ageKVRDD.aggregateByKey(0)((age1: Int, age2: Int) => {age1 + age2 // 预聚合}, (map1AgeSum: Int, map2AgeSum: Int) => {map1AgeSum + map2AgeSum // 聚合}).foreach(println)// 使用aggregateByKey统计班级人数clazzCntKVRDD.aggregateByKey(0)((c1: Int, c2: Int) => {c1 + 1 // 预聚合}, (map1Cnt: Int, map2Cnt: Int) => {map1Cnt + map2Cnt // 聚合}).foreach(println)// 使用aggregateByKey统计班级的平均年龄ageKVRDD.aggregateByKey((0, 0))((t2: (Int, Int), age: Int) => {val mapAgeSum: Int = t2._1 + ageval mapCnt: Int = t2._2 + 1(mapAgeSum, mapCnt)}, (map1U: (Int, Int), map2U: (Int, Int)) => {val ageSum: Int = map1U._1 + map2U._1val cnt: Int = map1U._2 + map2U._2(ageSum, cnt)}).map {case (clazz: String, (sumAge: Int, cnt: Int)) =>(clazz, sumAge.toDouble / cnt)}.foreach(println)}cartesian:转换算子,可以对两个RDD做笛卡尔积def main(args: Array[String]): Unit = {/*** cartesian:转换算子,可以对两个RDD做笛卡尔积** 当数据重复时 很容易触发笛卡尔积 造成数据的膨胀*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo14cartesian")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val idNameKVRDD: RDD[(String, String)] = sc.parallelize(List(("001", "zs"), ("002", "ls"), ("003", "ww")))val genderAgeKVRDD: RDD[(String, Int)] = sc.parallelize(List(("男", 25), ("女", 20), ("男", 22)))idNameKVRDD.cartesian(genderAgeKVRDD).foreach(println)}sortBy:转换算子 可以指定一个字段进行排序 默认升序def main(args: Array[String]): Unit = {/*** sortBy:转换算子 可以指定一个字段进行排序 默认升序*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo15sortBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val intRDD: RDD[Int] = sc.parallelize(List(1, 3, 6, 5, 2, 4, 6, 8, 9, 7))intRDD.sortBy(i => i).foreach(println) // 升序intRDD.sortBy(i => -i).foreach(println) // 降序intRDD.sortBy(i => i, ascending = false).foreach(println) // 降序val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 按照年龄进行降序stuRDD.sortBy(s => -s.split(",")(2).toInt).foreach(println)}常见的Action算子def main(args: Array[String]): Unit = {/*** 常见的Action算子:foreach、take、collect、count、reduce、save相关* 每个Action算子都会触发一个job**/val conf: SparkConf = new SparkConf()conf.setAppName("Demo16Action")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")/*** foreach:对每条数据进行处理,跟map算子的区别在于,foreach算子没有返回值*/stuRDD.foreach(println)// 将stuRDD中的每条数据保存到MySQL中/*** 建表语句:* CREATE TABLE `stu_rdd` (* `id` int(10) NOT NULL AUTO_INCREMENT,* `name` char(5) DEFAULT NULL,* `age` int(11) DEFAULT NULL,* `gender` char(2) DEFAULT NULL,* `clazz` char(4) DEFAULT NULL,* PRIMARY KEY (`id`)* ) ENGINE=InnoDB DEFAULT CHARSET=utf8;*/// 每一条数据都会创建一次连接,频繁地创建销毁连接效率太低 , 不合适//stuRDD.foreach(line => {//val splits: Array[String] = line.split(",")//// 1、建立连接//val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false", "root", "123456")//println("建立了一次连接")//// 2、创建prepareStatement//val pSt: PreparedStatement = conn.prepareStatement("insert into stu_rdd(id,name,age,gender,clazz) values(?,?,?,?,?)")////// 3、传入参数//pSt.setInt(1, splits(0).toInt)//pSt.setString(2, splits(1))//pSt.setInt(3, splits(2).toInt)//pSt.setString(4, splits(3))//pSt.setString(5, splits(4))////// 4、执行SQL//pSt.execute()////// 5、关闭连接//conn.close()////})/*** take : Action算子 , 可以将指定条数的数据转换成Scala中的Array**/// 这里的foreach是Array的方法,不是算子stuRDD.take(5).foreach(println)/*** collect : Action算子 , 可以将RDD中所有的数据转换成Scala中的Array*/// 这里的foreach是Array的方法,不是算子stuRDD.collect().foreach(println)/*** count : Action算子,统计RDD中数据的条数*/println(stuRDD.count())/*** reduce : Action算子,将所有的数据作为一组进行聚合操作*/// 统计所有学生的年龄之和println(stuRDD.map(_.split(",")(2).toInt).reduce(_ + _))/*** save相关:* saveAsTextFile、saveAsObjectFile*/}

推荐阅读