RDD(弹性分布式数据集)及常用算子RDD(Resilient Distributed Dataset)叫做弹性分布式数据集 , 是 Spark 中最基本的数据
处理模型 。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行
计算的集合 。
弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片 。
数据集:RDD 封装了计算逻辑,并不保存数据
数据抽象:RDD 是一个抽象类,需要子类具体实现
不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变 , 只能产生新的 RDD,在
- 新的 RDD 里面封装计算逻辑
五大特性:A list of partitionsA function for computing each splitA list of dependencies on other RDDsOptionally, a Partitioner for key-value RDDsOptionally, a list of preferred locations to compute each split on
文章插图
基础编程RDD 创建从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
val conf = new SparkConf().setMaster("local").setAppName("spark")val sc = new SparkContext(conf)val rdd1 = sc.parallelize( List(1,2,3,4))val rdd2 = sc.makeRDD( List(1,2,3,4))rdd1.collect().foreach(println)rdd2.collect().foreach(println)sc.stop()
从外部存储(文件)创建 RDDval conf = new SparkConf().setMaster("local").setAppName("spark")val sc = new SparkContext(conf)val fileRDD: RDD[String] = sc.textFile("input")fileRDD.collect().foreach(println)sc.stop()
RDD 转换算子RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型
/*** 在Spark所有的操作可以分为两类:* 1、Transformation操作(算子)* 2、Action操作(算子)** 转换算子是懒执行的,需要由Action算子触发执行* 每个Action算子会触发一个Job** Spark的程序的层级划分:* Application --> Job --> Stage --> Task** 怎么区分Transformation算子和Action算子?* 看算子的返回值是否还是RDD,如果是由一个RDD转换成另一个RDD,则该算子是转换算子* 如果由一个RDD得到其他类型(非RDD类型或者没有返回值),则该算子是行为算子** 在使用Spark处理数据时可以大体分为三个步骤:* 1、加载数据并构建成RDD* 2、对RDD进行各种各样的转换操作,即调用转换算子* 3、使用Action算子触发Spark任务的执行*/
map算子/*** map算子:转换算子* 需要接受一个函数f* 函数f:参数的个数只有一个,类型为RDD中数据的类型 => 返回值类型自己定义* 可以将函数f作用在RDD中的每一条数据上 , 需要函数f必须有返回值,最终会得到一个新的RDD* 传入一条数据得到一条数据*/ def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf()conf.setAppName("Demo03map")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val linesRDD: RDD[String] = sc.textFile("Spark/data/words.txt")linesRDD.map(line => {println("执行了map方法")line}).foreach(println)linesRDD.map(line => {println("执行了map方法")line}).foreach(println)linesRDD.map(line => {println("执行了map方法")line}).foreach(println)linesRDD.map(line => {println("执行了map方法")line}).foreach(println)List(1,2,3,4).map(line=>{println("List的map方法不需要什么Action算子触发")line})}
flatMap:转换算子def main(args: Array[String]): Unit = {/*** flatMap:转换算子* 同map算子类似 , 只不过所接受的函数f需要返回一个可以遍历的类型* 最终会将函数f的返回值进行展开(扁平化处理),得到一个新的RDD* 传入一条数据 会得到 多条数据*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo04flatMap")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)// 另一种构建RDD的方式:基于Scala本地的集合例如Listval intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))intRDD.foreach(println)val strRDD: RDD[String] = sc.parallelize(List("java,java,scala", "scala,scala,python", "python,python,python"))strRDD.flatMap(_.split(",")).foreach(println)}
filter:转换算子def main(args: Array[String]): Unit = {/*** filter:转换算子* 用于过滤数据,需要接受一个函数f* 函数f:参数只有一个,类型为RDD中每一条数据的类型 => 返回值类型必须为Boolean* 最终会基于函数f返回的Boolean值进行过滤,得到一个新的RDD* 如果函数f返回的Boolean为true则保留数据* 如果函数f返回的Boolean为false则过滤数据*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo05filter")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val seqRDD: RDD[Int] = sc.parallelize(1 to 100, 4)println(seqRDD.getNumPartitions) // getNumPartitions并不是算子,它只是RDD的一个属性//seqRDD.foreach(println)// 将奇数过滤出来seqRDD.filter(i => i % 2 == 1).foreach(println)// 将偶数过滤出来seqRDD.filter(i => i % 2 == 0).foreach(println)}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Redis系列8:Bitmap实现亿万级数据计算
- 数据科学学习手札146 geopandas中拓扑非法问题的发现、诊断与修复
- 小样本利器4. 正则化+数据增强 Mixup Family代码实现
- python3使用libpcap库进行抓包及数据处理
- 分布式ID生成方案总结整理
- Python数据分析:实用向
- .NET API 接口数据传输加密最佳实践
- SQL分层查询
- 京东云开发者|京东云RDS数据迁移常见场景攻略
- 华为手机怎么连接电脑方法(华为usb数据线接电脑)