我的Spark学习笔记( 二 )

mapPartitionsWithIndex算子:分区索引 + 数据迭代器import org.apache.spark.{SparkConf, SparkContext}// 分区索引object mapPartitionsWithIndex {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val mpiRDD = rdd.mapPartitionsWithIndex(//(分区索引,数据迭代器)(index, iter) => {println("index:" + index, "iter[" + iter.mkString(",") + "]")})mpiRDD.collect().foreach(println)sc.stop()}}flatMap算子:数据扁平化import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射object flatMap {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))// 多个list合并成一个listval flatRDD: RDD[Int] = rdd.flatMap(list => list)flatRDD.collect().foreach(println)sc.stop()}}glom算子:分区内数据合并import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将同一个分区的数据直接转换为相同类型的内存数组进行处理 , 分区不变object glom {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)// 把每一个分区内数据合并成Arrayval glomRDD: RDD[Array[Int]] = rdd.glom()glomRDD.collect().foreach(array => {println(array.mkString(","))})sc.stop()}}groupBy算子:数据分组import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle 。// 极限情况下,数据可能被分在同一个分区中一个组的数据在一个分区中 , 但是并不是说一个分区中只有一个组,分组和分区没有必然的关系object groupBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组,相同的key值的数据会放置在一个组中// val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(num => num % 2)val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)groupRDD.collect().foreach(println)sc.stop()}}filter算子:数据过滤import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 将数据根据指定的规则进行筛选过滤 , 符合规则的数据保留,不符合规则的数据丢弃 。// 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜 。object filter {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)filterRDD.collect().foreach(println)sc.stop()}}sample算子:数据采样随机抽取import org.apache.spark.{SparkConf, SparkContext}// 根据指定的规则从数据集中抽取数据object sample {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val dataRDD = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 1)// 抽取数据不放回(伯努利算法)// 伯努利算法:又叫 0、 1 分布 。例如扔硬币,要么正面,要么反面 。// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较 , 小于第二个参数要,大于不要// 第一个参数:抽取的数据是否放回,false:不放回// 第二个参数:抽取的几率 , 范围只能在[0,1]之间,0:全不?。?1:全?。?// 第三个参数:随机数种子val dataRDD1 = dataRDD.sample(false, 0.5)// 抽取数据放回(泊松算法)// 第一个参数:抽取的数据是否放回,true:放回; false:不放回// 第二个参数:重复数据的几率 , 范围大于等于0,可以大于1 表示每一个元素被期望抽取到的次数// 第三个参数:随机数种子// 例如数据集内有10个,fraction为1的话抽取10个,0.5的话抽取5个 , 2的话抽取20个val dataRDD2 = dataRDD.sample(true, 2)println(dataRDD1.collect().mkString(","))println(dataRDD2.collect().mkString(","))sc.stop()}}distinct算子:数据去重import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object distinct {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))val rdd1: RDD[Int] = rdd.distinct()val rdd2: RDD[Int] = rdd.distinct(2)// 底层相当于这样写val rdd3 = rdd.map(x => (x, null)).reduceByKey((x, _) => x).map(_._1)println(rdd.collect().mkString(","))println(rdd1.collect().mkString(","))println(rdd2.collect().mkString(","))println(rdd3.collect().mkString(","))sc.stop()}}

推荐阅读