弹性分布式数据集 RDD及常用算子( 二 )

sample:转换算子def main(args: Array[String]): Unit = {/*** sample:转换算子* 用于对数据进行取样* 总共有三个参数:* withReplacement:有无放回* fraction:抽样的比例(这个比例并不是精确的,因为抽样是随机的)* seed:随机数种子*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo06sample")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")stuRDD.sample(withReplacement = false, 0.1).foreach(println)// 如果想让每次抽样的数据都一样 , 则可以将seed进行固定stuRDD.sample(withReplacement = false, 0.01, 10).foreach(println)}mapValues:转换算子def main(args: Array[String]): Unit = {/*** mapValues:转换算子* 同map类似,只不过mapValues需要对KV格式的RDD的Value进行遍历处理*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo07mapValues")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val kvRDD: RDD[(String, Int)] = sc.parallelize(List("k1" -> 1, "k2" -> 2, "k3" -> 3))// 对每个Key对应的Value进行平方kvRDD.mapValues(i => i * i).foreach(println)// 使用map方法实现kvRDD.map(kv => (kv._1, kv._2 * kv._2)).foreach(println)}join:转换算子def main(args: Array[String]): Unit = {/*** join:转换算子* 需要作用在两个KV格式的RDD上,会将相同的Key的数据关联在一起*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo08join")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)// 加载学生数据,并转换成KV格式,以ID作为Key,其他数据作为Valueval stuKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/students.txt").map(line => {val id: String = line.split(",")(0)// split 指定分割符切分字符串得到Array// mkString 指定拼接符将Array转换成字符串val values: String = line.split(",").tail.mkString("|")(id, values)})// 加载分数数据 , 并转换成KV格式,以ID作为Key,其他数据作为Valueval scoKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/score.txt").map(line => {val id: String = line.split(",")(0)val values: String = line.split(",").tail.mkString("|")(id, values)})// join : 内连接val joinRDD1: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD)//joinRDD1.foreach(println)//stuKVRDD.leftOuterJoin(scoKVRDD).foreach(println)//stuKVRDD.rightOuterJoin(scoKVRDD).foreach(println)stuKVRDD.fullOuterJoin(scoKVRDD).foreach(println)}union:转换算子,用于将两个相类型的RDD进行连接def main(args: Array[String]): Unit = {// union:转换算子,用于将两个相类型的RDD进行连接val conf: SparkConf = new SparkConf()conf.setAppName("Demo09union")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")val sample01RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)val sample02RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)println(s"sample01RDD的分区数:${sample01RDD.getNumPartitions}")println(s"sample02RDD的分区数:${sample02RDD.getNumPartitions}")// union 操作最终得到的RDD的分区数等于两个RDD分区数之和println(s"union后的分区数:${sample01RDD.union(sample02RDD).getNumPartitions}")val intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))//sample01RDD.union(intRDD) // 两个RDD的类型不一致无法进行union// union 等同于SQL中的union allsample01RDD.union(sample02RDD).foreach(println)// 如果要进行去重 即等同于SQL中的union 则可以在 union后再进行distinctsample01RDD.union(sample02RDD).distinct().foreach(println)}groupBy:按照某个字段进行分组def main(args: Array[String]): Unit = {/*** groupBy:按照某个字段进行分组*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo10groupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 统计班级人数stuRDD.groupBy(s => s.split(",")(4)).map(kv => s"${kv._1},${kv._2.size}").foreach(println)}groupByKey:转换算子 , 需要作用在KV格式的RDD上 def main(args: Array[String]): Unit = {/*** groupByKey:转换算子 , 需要作用在KV格式的RDD上*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo11groupByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 使用groupByKey统计班级人数// 将学生数据变成KV格式的RDD,以班级作为Key,1作为Valueval clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))val grpRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()grpRDD.map(kv => s"${kv._1},${kv._2.size}").foreach(println)}reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合def main(args: Array[String]): Unit = {/*** reduceByKey:转换算子,需要作用在KV格式的RDD上,不仅能实现分组,还能实现聚合* 需要接受一个函数f* 函数f:两个参数,参数的类型同RDD的Value的类型一致,最终需要返回同RDD的Value的类型一致值* 实际上函数f可以看成一个聚合函数* 常见的聚合函数(操作):max、min、sum、count、avg* reduceByKey可以实现Map端的预聚合,类似MR中的Combiner* 并不是所有的操作都能使用预聚合,例如avg就无法实现*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo11groupByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 使用reduceByKey统计班级人数// 将学生数据变成KV格式的RDD , 以班级作为Key,1作为Valueval clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))clazzKVRDD.reduceByKey((i1: Int, i2: Int) => i1 + i2).foreach(println)// 简写形式clazzKVRDD.reduceByKey((i1, i2) => i1 + i2).foreach(println)clazzKVRDD.reduceByKey(_ + _).foreach(println)}

推荐阅读