弹性分布式数据集 RDD及常用算子

RDD(弹性分布式数据集)及常用算子RDD(Resilient Distributed Dataset)叫做弹性分布式数据集 , 是 Spark 中最基本的数据
处理模型 。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行
计算的集合 。
弹性

  • 存储的弹性:内存与磁盘的自动切换;
  • 容错的弹性:数据丢失可以自动恢复;
  • 计算的弹性:计算出错重试机制;
  • 分片的弹性:可根据需要重新分片 。
【弹性分布式数据集 RDD及常用算子】分布式:数据存储在大数据集群不同节点上
数据集:RDD 封装了计算逻辑,并不保存数据
数据抽象:RDD 是一个抽象类,需要子类具体实现
不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变 , 只能产生新的 RDD,在
  • 新的 RDD 里面封装计算逻辑
可分区、并行计算
五大特性:A list of partitionsA function for computing each splitA list of dependencies on other RDDsOptionally, a Partitioner for key-value RDDsOptionally, a list of preferred locations to compute each split on
弹性分布式数据集 RDD及常用算子

文章插图
基础编程RDD 创建从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
val conf = new SparkConf().setMaster("local").setAppName("spark")val sc = new SparkContext(conf)val rdd1 = sc.parallelize( List(1,2,3,4))val rdd2 = sc.makeRDD( List(1,2,3,4))rdd1.collect().foreach(println)rdd2.collect().foreach(println)sc.stop()从外部存储(文件)创建 RDDval conf = new SparkConf().setMaster("local").setAppName("spark")val sc = new SparkContext(conf)val fileRDD: RDD[String] = sc.textFile("input")fileRDD.collect().foreach(println)sc.stop()RDD 转换算子RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value
类型
/*** 在Spark所有的操作可以分为两类:* 1、Transformation操作(算子)* 2、Action操作(算子)** 转换算子是懒执行的,需要由Action算子触发执行* 每个Action算子会触发一个Job** Spark的程序的层级划分:* Application --> Job --> Stage --> Task** 怎么区分Transformation算子和Action算子?* 看算子的返回值是否还是RDD,如果是由一个RDD转换成另一个RDD,则该算子是转换算子* 如果由一个RDD得到其他类型(非RDD类型或者没有返回值),则该算子是行为算子** 在使用Spark处理数据时可以大体分为三个步骤:* 1、加载数据并构建成RDD* 2、对RDD进行各种各样的转换操作,即调用转换算子* 3、使用Action算子触发Spark任务的执行*/map算子/*** map算子:转换算子* 需要接受一个函数f* 函数f:参数的个数只有一个,类型为RDD中数据的类型 => 返回值类型自己定义* 可以将函数f作用在RDD中的每一条数据上 , 需要函数f必须有返回值,最终会得到一个新的RDD* 传入一条数据得到一条数据*/ def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf()conf.setAppName("Demo03map")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val linesRDD: RDD[String] = sc.textFile("Spark/data/words.txt")linesRDD.map(line => {println("执行了map方法")line}).foreach(println)linesRDD.map(line => {println("执行了map方法")line}).foreach(println)linesRDD.map(line => {println("执行了map方法")line}).foreach(println)linesRDD.map(line => {println("执行了map方法")line}).foreach(println)List(1,2,3,4).map(line=>{println("List的map方法不需要什么Action算子触发")line})}flatMap:转换算子def main(args: Array[String]): Unit = {/*** flatMap:转换算子* 同map算子类似 , 只不过所接受的函数f需要返回一个可以遍历的类型* 最终会将函数f的返回值进行展开(扁平化处理),得到一个新的RDD* 传入一条数据 会得到 多条数据*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo04flatMap")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)// 另一种构建RDD的方式:基于Scala本地的集合例如Listval intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))intRDD.foreach(println)val strRDD: RDD[String] = sc.parallelize(List("java,java,scala", "scala,scala,python", "python,python,python"))strRDD.flatMap(_.split(",")).foreach(println)}filter:转换算子def main(args: Array[String]): Unit = {/*** filter:转换算子* 用于过滤数据,需要接受一个函数f* 函数f:参数只有一个,类型为RDD中每一条数据的类型 => 返回值类型必须为Boolean* 最终会基于函数f返回的Boolean值进行过滤,得到一个新的RDD* 如果函数f返回的Boolean为true则保留数据* 如果函数f返回的Boolean为false则过滤数据*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo05filter")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val seqRDD: RDD[Int] = sc.parallelize(1 to 100, 4)println(seqRDD.getNumPartitions) // getNumPartitions并不是算子,它只是RDD的一个属性//seqRDD.foreach(println)// 将奇数过滤出来seqRDD.filter(i => i % 2 == 1).foreach(println)// 将偶数过滤出来seqRDD.filter(i => i % 2 == 0).foreach(println)}

推荐阅读