Python Programming Guide - Spark(Python)
Spark应用基本概念
每一个运行在cluster上的spark应用程序,是由一个运行main函数的driver program和运行多种并行操作的executes组成
其中spark的核心是弹性分布式数据集(Resilient Distributed Dataset—RDD)
Resilient(弹性):易变化、易计算
Distributed(分布式):可横跨多台机器,集群分布
Dataset(数据集):大批量数据的集合
<!-- more -->
RDD基本概念
RDD是逻辑集中的实体,代表一个分区的只读数据集,不可发生改变
【RDD的重要内部属性】
分区列表(partitions)
对于一个RDD而言,分区的多少涉及对这个RDD并行计算的粒度,每一个RDD分区的计算都会在一个单独的任务中执行,每一个分区对应一个Task,分区后的数据存放在内存当中计算每个分区的函数(compute)
对于Spark中每个RDD都是以分区进行计算的,并且每个分区的compute函数是在对迭代器进行复合操作,不需要每次计算,直到提交动作触发才会将之前所有的迭代操作进行计算,lineage在容错中有重要作用对父级RDD的依赖(dependencies)
由于RDD存在转换关系,所以新生成的RDD对上一个RDD有依赖关系,RDD之间通过lineage产生依赖关系
【窄依赖】
每一个父RDD的分区最多只被子RDD的一个分区所使用,可以类似于流水线一样,计算所有父RDD的分区;在节点计算失败的恢复上也更有效,可以直接计算其父RDD的分区,还可以进行并行计算子RDD的每个分区依赖于常数个父分区(即与数据规模无关)
输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatmap
输入输出一对一,但结果RDD的分区结构发生了变化,如union、coalesce
从输入中选择部分元素的算子,如filter、distinct、subtract、sample【宽依赖】
多个子RDD的分区会依赖于同一个父RDD的分区,需要取得其父RDD的所有分区数据进行计算,而一个节点的计算失败,将会导致其父RDD上多个分区重新计算子RDD的每个分区依赖于所有父RDD分区
对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
对两个RDD基于key进行jion和重组,如jion
对key-value数据类型RDD的分区器,控制分区策略和分区数(partitioner)
partitioner就是RDD的分区函数,即HashPartitioner(哈希分区)和RangePartitioner(区域分区),分区函数决定了每个RDD的分区策略和分区数,并且这个函数只在(k-v)类型的RDD中存在,在非(k-v)结构的RDD中是None每个数据分区的地址列表(preferredLocations)
与Spark中的调度相关,返回的是此RDD的每个partition所出储存的位置,按照“移动数据不如移动计算”的理念,在spark进行任务调度的时候,尽可能将任务分配到数据块所存储的位置控制操作(control operation)
spark中对RDD的持久化操作是很重要的,可以将RDD存放在不同的存储介质中,方便后续的操作可以重复使用。
主要有cache、persist、checkpoint,checkpoint接口是将RDD持久化到HDFS中,与persist的区别是checkpoint会切断此RDD之前的依赖关系,而persist会保留依赖关系。checkpoint的两大作用:一是spark程序长期驻留,过长的依赖会占用很多的系统资源,定期checkpoint可以有效的节省资源;二是维护过长的依赖关系可能会出现问题,一旦spark程序运行失败,RDD的容错成本会很高
Python连接Spark
Spark 1.6.0 支持 Python 2.6+ 或者 Python 3.4+,它使用标准的CPython解释器, 所以像NumPy这样的C语言类库也可以使用,同样也支持PyPy 2.3+
可以用spark目录里的bin/spark-submit脚本在python中运行spark应用程序,这个脚本可以加载Java/Scala类库,让你提交应用程序到集群当中。你也可以使用bin/pyspark脚本去启动python交互界面
如果你希望访问HDFS上的数据集,你需要建立对应HDFS版本的PySpark连接。
最后,你的程序需要import一些spark类库:
from pyspark import SparkContext, SparkConf
PySpark 要求driver和workers需要相同的python版本,它通常引用环境变量PATH默认的python版本;你也可以自己指定PYSPARK_PYTHON所用的python版本,例如:
PYSPARK_PYTHON=python3.4 bin/pyspark
PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
初始化Spark
一个Spark应用程序的第一件事就是去创建SparkContext对象,它的作用是告诉Spark如何建立一个集群。创建SparkContext之前,先要创建SparkConf对象,SparkConf包含了应用程序的相关信息。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName:应用的名称,用户显示在集群UI上
master:Spark、Mesos或者YARN集群的URL,如果是本地运行,则应该是特殊的'local'字符串
在实际运行时,你不会讲master参数写死在程序代码里,而是通过spark-submit来获取这个参数;在本地测试和单元测试中,你仍然需要'local'去运行Spark应用程序
使用Shell
在PySpark Shell中,一个特殊SparkContext已经帮你创建好了,变量名是:sc,然而在Shell中创建你自己的SparkContext是不起作用的。
你可以通过--master参数设置master所连接的上下文主机;你也可以通过--py-files参数传递一个用逗号作为分割的列表,将Python中的.zip、.egg、.py等文件添加到运行路径当中;你同样可以通过--packages参数,传递一个用逗号分割的maven列表,来个这个Shell会话添加依赖(例如Spark的包)
任何额外的包含依赖的仓库(如SonaType),都可以通过--repositories参数添加进来。
Spark中所有的Python依赖(requirements.txt的依赖包列表),在必要时都必须通过pip手动安装
例如用4个核来运行bin/pyspark:
./bin/pyspark --master local[4]
或者,将code.py添加到搜索路径中(为了后面可以import):
./bin/pyspark --master local[4] --py-files code.py
通过运行pyspark --help来查看完整的操作帮助信息,在这种情况下,pyspark会调用一个通用的spark-submit脚本
在IPython这样增强Python解释器中,也可以运行PySpark Shell;支持IPython 1.0.0+;在利用IPython运行bin/pyspark时,必须将PYSPARK_DRIVER_PYTHON变量设置成ipython:
PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
你可以通过PYSPARK_DRIVER_PYTHON_OPTS参数来自己定制ipython命令,比如在IPython Notebook中开启PyLab图形支持:
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark
参考:Spark Programming Guide 官方文档
原博链接,请注明出处。