Spark 编程指南 (一) [Spa

发布时间:2019-09-28 08:37:44编辑:auto阅读(1725)

    Python Programming Guide - Spark(Python)

    Spark应用基本概念

    每一个运行在cluster上的spark应用程序,是由一个运行main函数的driver program和运行多种并行操作的executes组成

    其中spark的核心是弹性分布式数据集(Resilient Distributed Dataset—RDD)

    • Resilient(弹性):易变化、易计算

    • Distributed(分布式):可横跨多台机器,集群分布

    • Dataset(数据集):大批量数据的集合

    <!-- more -->

    RDD基本概念

    RDD是逻辑集中的实体,代表一个分区的只读数据集,不可发生改变

    【RDD的重要内部属性】

    • 分区列表(partitions)
      对于一个RDD而言,分区的多少涉及对这个RDD并行计算的粒度,每一个RDD分区的计算都会在一个单独的任务中执行,每一个分区对应一个Task,分区后的数据存放在内存当中

    • 计算每个分区的函数(compute)
      对于Spark中每个RDD都是以分区进行计算的,并且每个分区的compute函数是在对迭代器进行复合操作,不需要每次计算,直到提交动作触发才会将之前所有的迭代操作进行计算,lineage在容错中有重要作用

    • 对父级RDD的依赖(dependencies)
      由于RDD存在转换关系,所以新生成的RDD对上一个RDD有依赖关系,RDD之间通过lineage产生依赖关系

    【窄依赖】
    每一个父RDD的分区最多只被子RDD的一个分区所使用,可以类似于流水线一样,计算所有父RDD的分区;在节点计算失败的恢复上也更有效,可以直接计算其父RDD的分区,还可以进行并行计算

    子RDD的每个分区依赖于常数个父分区(即与数据规模无关)
    输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatmap
    输入输出一对一,但结果RDD的分区结构发生了变化,如union、coalesce
    从输入中选择部分元素的算子,如filter、distinct、subtract、sample

    【宽依赖】
    多个子RDD的分区会依赖于同一个父RDD的分区,需要取得其父RDD的所有分区数据进行计算,而一个节点的计算失败,将会导致其父RDD上多个分区重新计算

    子RDD的每个分区依赖于所有父RDD分区
    对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
    对两个RDD基于key进行jion和重组,如jion

    • 对key-value数据类型RDD的分区器,控制分区策略和分区数(partitioner)
      partitioner就是RDD的分区函数,即HashPartitioner(哈希分区)和RangePartitioner(区域分区),分区函数决定了每个RDD的分区策略和分区数,并且这个函数只在(k-v)类型的RDD中存在,在非(k-v)结构的RDD中是None

    • 每个数据分区的地址列表(preferredLocations)
      与Spark中的调度相关,返回的是此RDD的每个partition所出储存的位置,按照“移动数据不如移动计算”的理念,在spark进行任务调度的时候,尽可能将任务分配到数据块所存储的位置

    • 控制操作(control operation)
      spark中对RDD的持久化操作是很重要的,可以将RDD存放在不同的存储介质中,方便后续的操作可以重复使用。

    主要有cache、persist、checkpoint,checkpoint接口是将RDD持久化到HDFS中,与persist的区别是checkpoint会切断此RDD之前的依赖关系,而persist会保留依赖关系。checkpoint的两大作用:一是spark程序长期驻留,过长的依赖会占用很多的系统资源,定期checkpoint可以有效的节省资源;二是维护过长的依赖关系可能会出现问题,一旦spark程序运行失败,RDD的容错成本会很高

    Python连接Spark

    Spark 1.6.0 支持 Python 2.6+ 或者 Python 3.4+,它使用标准的CPython解释器, 所以像NumPy这样的C语言类库也可以使用,同样也支持PyPy 2.3+

    可以用spark目录里的bin/spark-submit脚本在python中运行spark应用程序,这个脚本可以加载Java/Scala类库,让你提交应用程序到集群当中。你也可以使用bin/pyspark脚本去启动python交互界面

    如果你希望访问HDFS上的数据集,你需要建立对应HDFS版本的PySpark连接。

    最后,你的程序需要import一些spark类库:

    from pyspark import SparkContext, SparkConf

    PySpark 要求driver和workers需要相同的python版本,它通常引用环境变量PATH默认的python版本;你也可以自己指定PYSPARK_PYTHON所用的python版本,例如:

    PYSPARK_PYTHON=python3.4 bin/pyspark
    PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

    初始化Spark

    一个Spark应用程序的第一件事就是去创建SparkContext对象,它的作用是告诉Spark如何建立一个集群。创建SparkContext之前,先要创建SparkConf对象,SparkConf包含了应用程序的相关信息。

    conf = SparkConf().setAppName(appName).setMaster(master)
    sc = SparkContext(conf=conf)
    • appName:应用的名称,用户显示在集群UI上

    • master:Spark、Mesos或者YARN集群的URL,如果是本地运行,则应该是特殊的'local'字符串

    在实际运行时,你不会讲master参数写死在程序代码里,而是通过spark-submit来获取这个参数;在本地测试和单元测试中,你仍然需要'local'去运行Spark应用程序

    使用Shell

    在PySpark Shell中,一个特殊SparkContext已经帮你创建好了,变量名是:sc,然而在Shell中创建你自己的SparkContext是不起作用的。

    你可以通过--master参数设置master所连接的上下文主机;你也可以通过--py-files参数传递一个用逗号作为分割的列表,将Python中的.zip、.egg、.py等文件添加到运行路径当中;你同样可以通过--packages参数,传递一个用逗号分割的maven列表,来个这个Shell会话添加依赖(例如Spark的包)

    任何额外的包含依赖的仓库(如SonaType),都可以通过--repositories参数添加进来。
    Spark中所有的Python依赖(requirements.txt的依赖包列表),在必要时都必须通过pip手动安装

    例如用4个核来运行bin/pyspark:

    ./bin/pyspark --master local[4]

    或者,将code.py添加到搜索路径中(为了后面可以import):

    ./bin/pyspark --master local[4] --py-files code.py

    通过运行pyspark --help来查看完整的操作帮助信息,在这种情况下,pyspark会调用一个通用的spark-submit脚本

    在IPython这样增强Python解释器中,也可以运行PySpark Shell;支持IPython 1.0.0+;在利用IPython运行bin/pyspark时,必须将PYSPARK_DRIVER_PYTHON变量设置成ipython:

    PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

    你可以通过PYSPARK_DRIVER_PYTHON_OPTS参数来自己定制ipython命令,比如在IPython Notebook中开启PyLab图形支持:

    PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark

    参考:Spark Programming Guide 官方文档

    原博链接,请注明出处。

关键字