PySpark基本原理
1. 关于spark
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
2. spark执行原理
- Master(ResourceManager):集群大管家,整个 集群的资源管理和分配 。
- Worker(NodeManager):单个机器的管家,负责 在单个服务器上提供运行容器,管理当前机器的资源。
- Driver:单个Spark任务的管理者,管理Executor 的任务执行和任务分解分配,类似YARN的 ApplicationMaster, 完成除了RDD运算外所有的工作。
- Executor:具体干活的进程,Spark的工作任务 (Task)都由Executor来负责执行,也就是说所有的RDD运算都是由它完成的。
- Driver中完成了SparkContext对象构建
- 将SparkContext对象以序列化的形式发送给Executor,每个Executor读取一部分数据,计算数据, shuffle交换数据,得到计算结果
- collect方法使得数据汇集到Driver,得到最终结果
大部分的spark作业是Driver->Executors->Driver,一份代码, 多个Executors并行运行
3. Python on Spark执行原理
- Driver部分: python代码构建SparkContext对象,以socket网络通道传输, 用Py4j翻译成JVM的代码运行
- Executor部分: Executor部分代码繁多, 难以进行java互相转化,不可以翻译成JVM代码. 启动deamon的守护进程,作为一个中转站, 完成JVM Executor和JVM Driver互相通信
- Driver的操作指令发送给JVM Executor
- JVM Executor通过PySpark守护进程将指令发送给PySpark守护进程
- PySpark守护进程将指令调度到运行的Python进程上.
- Executor端本质上是Python进程在工作
- 指令由JVM Executor发送(RPC)
Python->JVM代码->JVM Driver->调度JVM Executor->PySpark中转->Python Excutor进程
Driver运行JVM代码 Executor运行Python代码
4. SparkSql
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
相比于Spark RDD API,Spark SQL包含了对结构化数据和在其上运算的更多信息,Spark SQL使用这些信息进行了额外的优化,使对结构化数据的操作更加高效和方便。有多种方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但无论是哪种API或者是编程语言,它们都是基于同样的执行引擎,因此可以在不同的API之间随意切换,它们各有各的特点。
Hive将Hive SQL转换成MapReduce然后提交到集群中去执行,虽然大大简化了编写MapReduce程序的复杂性,由于MapReduce这种计算模型执行效率比较慢,所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群中去运行,执行效率非常快! SparkSQL的优点:
- 易整合
- 统一的数据访问
- 兼容Hive
- 标准的数据连接
因此,我们选择使用SparkSQL进行数据操作
5. RDD
RDD定义
分布式计算需要:
- 分区控制
- Shuffle控制
- 数据存储\序列化\发送
- 数据计算API
等一系列功能
这些功能, 不能简单的通过Python内置的本地集合对象(如 List\ 字典等)去完成. 我们在分布式框架中, 需要有一个统一的数据抽象对象, 来实现上述分布式计算所需功能. 这个抽象对象, 就是RDD。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。 Dataset:一个数据集合,用于存放数据的(本地集合:都在一个进程中)。 Distributed:RDD中的数据是分布式存储的,可用于分布式计算(跨机器跨进程)。 Resilient:RDD中的数据可以存储在内存中或者磁盘中。
- RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里 面的元素可并行计算的集合。
- 所有的运算以及操作都建立在 RDD 数据结构的基础之上。
- 可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类Abstract Class和泛型Generic Type
可以认为是一个增强的list对象,以下为RDD的特性:
- RDD是有分区的 RDD分区是RDD数据存储的最小单位,一份RDD数据本质上是分割成了多个分区,RDD是抽象对象,属于逻辑存储,分区是物理概念。
rdd.glom().collect() # glom()方法把分区排布构建出来,从collect()输出
- RDD方法会作用到所有分区上 RDD的操作会对每一个分区执行,逻辑上RDD的操作是统一的,物理上的分区都会受到操作的影响
- RDD间有依赖关系 RDD都会有一个依赖的链条,称之为血缘关系,RDD之间进行迭代计算,和mapreduce的差不多 我们总是由一个RDD计算产生新的RDD再由新的RDD迭代计算产生新的RDD最后把结果产生
- (可选)对于KeyValue型RDD分区器可选 KV型RDD指存储数据类型为二元元组的RDD 分区器:相同的数据会分类到同一个组内 默认分区器:Hash分区规则,可以手动自定义分区器(rdd。partitionBy)
- (可选)移动数据不如移动计算 和性能有关,避免网络读取,确保并行计算的能力的条件下,尽量本地读取,RDD分区数据的读取会尽量靠近数据所在地。(经常会被触发)
7. DataFrame
DataFrame的前身是SchemaRDD,从Spark 1.3.0开始SchemaRDD更名为DataFrame。与SchemaRDD的主要区别是:DataFrame不再直接继承自RDD,而是自己实现了RDD的绝大多数功能。你仍旧可以在DataFrame上调用rdd方法将其转换为一个RDD。
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。DataFrame可以从很多数据源构建,比如:已经存在的RDD、结构化文件、外部数据库、Hive表。
DataFrame是一个二维表结构, 那么表格结构为: • 行 • 列 • 表结构描述 DataFrame的组成如下:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息 在数据层面
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息
RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。