0
点赞
收藏
分享

微信扫一扫

python实战spark(三)--SparkContext


常用API

​​Spark官方文档​​

SparkConf

​class pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)​配置应用:用于将各种spark参数设置为键值对。
大多数情况下,您将使用SparkConf()创建SparkConf对象,该对象将从​​spark.*​​加载值。 Java系统属性也是如此。在这种情况下,您直接在SparkConf对象上设置的任何参数优先于系统属性。

对于单元测试,您还可以调用SparkConf(false)来跳过加载外部设置并获得相同的配置,无论系统属性是什么。
该类中的所有setter方法都支持链式。如​​​conf.setMaster("local").setAppName("My app")​​​。
注意:一旦将SparkConf对象传递给Spark,它就会被克隆,用户不再可以修改它。
函数调用
​​​contains(key)​​​配置是否包含key
​​​get(key, defaultValue=None)​​​获取key的值或返回默认值
​​​getAll()​​​获取所有值作为key-value的列表
​​​set(key, value)​​​设置一个配置属性
​​​setAll(pairs)​​​设置多个参数,传递一个key-value的列表
​​​setAppName(value)​​​应用名
​​​setExecutorEnv(key=None, value=None, pairs=None)​​​设置环境变量
​​​setIfMissing(key, value)​​​如果不存在设置配置属性
​​​setMaster(value)​​​master URL
​​​setSparkHome(value)​​​spark安装在worker节点的路径
​​​toDebugString()​​返回一个打印版本的配置信息

SparkContext

​class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)​​ Spark功能的主要入口点。SparkContext表示到Spark集群的连接,可用于在该集群上创建RDD和广播变量。

注意:SparkContext实例不支持跨多个进程开箱共享,PySpark不保证多进程执行。将线程用于并发处理替代。
​​​PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')​函数调用
1.​​accumulator(value, accum_param=None)​​ 使用给定的初值创建累加器,使用给定的​​AccumulatorParam​​对象定义添加数据类型的值。如果不提供类型,默认整数和浮点数。

2.​​addFile(path, recursive=False)​​ 在每个节点上添加一个spark job要下载的文件。传递的路径可以是本地文件、HDFS中的文件(或其他hadoop支持的文件系统),也可以是HTTP、HTTPS或FTP URI。

要访问Spark作业中的文件,使用​​L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}​​​查找其下载位置。
将recursive选项为True,可以提供一个目录。目前目录只支持hadoop支持的文件系统。
注意:一个路径只能添加一次。相同路径的后续添加将被忽略。

>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("100")
>>> sc.addFile(path)
>>> def func(iterator):
... with open(SparkFiles.get("test.txt")) as testFile:
... fileVal = int(testFile.readline())
... return [x * fileVal for x in iterator]
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]

3.​​addPyFile(path)​​​ 为将来在这个SparkContext上执行的所有任务添加.py或.zip依赖项。传递的路径可以是本地文件、HDFS中的文件(或其他hadoop支持的文件系统),也可以是HTTP、HTTPS或FTP URI。
注意:一个路径只能添加一次。相同路径的后续添加将被忽略。
​property applicationId​​ Spark应用的唯一标识符,格式取决于调度器实现。

>>> sc.applicationId  
'local-...'

4.​​binaryFiles(path, minPartitions=None)​​​ 以字节数组的形式从HDFS、本地文件系统(在所有节点上可用)或任何hadoop支持的文件系统URI中读取二进制文件的目录。每个文件作为一条记录读取,并以键-值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
:小文件是首选,大文件也是允许的,但可能会造成不良的性能。
​binaryRecords(path, recordLength)​​ 从flat binary file加载数据,假设每个记录是一组指定数字格式的数字(请参阅ByteBuffer),并且每个记录的字节数是常量。
参数:
path——输入数据文件的目录
recordLength——分割记录的长度
5.​​broadcast(value)​​前面提到的广播变量,只读,生成​​L{Broadcast<pyspark.broadcast.Broadcast>}​​对象,发送到每一台机器且仅发送一次。
6.​​cancelAllJobs()​​取消所有job(计划和running的)
7.​​cancelJobGroup(groupId)​​取消指定group的active job
8.​​property defaultMinPartitions​​默认最小分区
9.​​property defaultParallelism​​默认并行度
10.​​dump_profiles(path)​​将配置文件状态转储到目录路径中
11.​​emptyRDD()​​创建一个没有分区和元素的RDD
12.​​getConf()​​ 13.​​getLocalProperty(key)​​获取这个线程的本地属性
14.​​classmethod getOrCreate(conf=None)​​获取或实例化SparkContext并将其注册为单例对象。
15.​​hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)​​使用任意键值类从HDFS、本地文件系统(在所有节点上可用)或任何Hadoop支持的文件系统URI读取“旧的”Hadoop InputFormat。其机制与sc.sequenceFile相同。
Hadoop配置可以作为Python dict传递,它将被转换成Java配置。
参数
path --Hadoop文件的路径
inputFormatClass --严格的Hadoop InputFormat类名,如​​org.apache.hadoop.mapred.TextInputFormat​​ keyClass --严格的可写的类名的key
valueClass --严格的可写的类名的value
keyConverter
valueConverter
conf – Hadoop配置参数, 字典形式传递
batchSize – 多少个python对象代表一个java对象(默认为0)
16.​​newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)​​ 从HDFS、本地文件系统(在所有节点上可用)或任何Hadoop支持的文件系统URI中读取带有任意键和值类的“新API”Hadoop InputFormat。其机制与sc.sequenceFile相同。
参数与上述​​hadoopFile​​相似
17.​​newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)​​ 18.​​parallelize(c, numSlices=None)​​分发一个本地Python集合以形成一个RDD。如果输入表示一个性能范围,建议使用xrange。

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]

19.​​pickleFile(name, minPartitions=None)[source]​​​加载以前使用​​RDD.saveAsPickleFile​​方法保存的RDD。

>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

20.​​range(start, end=None, step=1, numSlices=None)​​类似python的range,生成元素创建一个RDD

>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]

21.​​runJob(rdd, partitionFunc, partitions=None, allowLocal=False)​​在指定的一组分区上执行给定的partitionFunc,以元素数组的形式返回结果。如果没有指定“分区”,这将在所有分区上运行。

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part])
[0, 1, 4, 9, 16, 25]

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]

22.​​sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)​​从HDFS、本地文件系统(在所有节点上可用)或任何Hadoop支持的文件系统URI中,读取带有任意的键值可写类的Hadoop SequenceFile

  • Java RDD是从​​SequenceFile​​​或其他​​InputFormat​​以及可写的键值类创建的
  • 序列化通过​​Pyrolite pickling​​实现
  • 如果失败,回退方法是对每个键和值调用’ toString ’
  • PickleSerializer用于在Python端反序列化pickle对象

23.​​setCheckpointDir(dirName)​​​设置RDDs checkpointed的目录。如果在集群上运行,目录必须是HDFS路径。
24.​​​setJobDescription(value)​​​为job设定一个可读的描述
25.​​​setJobGroup(groupId, description, interruptOnCancel=False)​​​将组ID分配给此线程启动的所有作业,直到组ID设置为不同的值或清除为止。
通常,应用程序中的一个执行单元由多个Spark操作或作业组成。我们可以使用此方法将所有这些作业分组并给出组描述。设置之后,Spark web UI将把这些作业与这个组关联起来。
应用可以使用​​​SparkContext.cancelJobGroup​​来取消此组中所有正在运行的作业。

>>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
... sleep(100)
... raise Exception("Task should have been cancelled")
>>> def start_job(x):
... global result
... try:
... sc.setJobGroup("job_to_cancel", "some description")
... result = sc.parallelize(range(x)).map(map_func).collect()
... except Exception as e:
... result = "Cancelled"
... lock.release()
>>> def stop_job():
... sleep(5)
... sc.cancelJobGroup("job_to_cancel")
>>> suppress = lock.acquire()
>>> suppress = threading.Thread(target=start_job, args=(10,)).start()
>>> suppress = threading.Thread(target=stop_job).start()
>>> suppress = lock.acquire()
>>> print(result)
Cancelled

如果将job组的​​interruptOnCancel​​​设置为true,那么作业取消将导致在作业的执行器线程上调用Thread.interrupt()。这有助于确保及时地停止任务,但是由于HDFS-1208而默认关闭,其中HDFS可能通过将节点标记为死节点来响应​​Thread.interrupt()​​​。
26.​​​setLocalProperty(key, value)​​​设置影响从该线程提交的作业的本地属性,如Spark fair调度程序池。
27.​​​setLogLevel(logLevel)​​​ ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
28.​​​classmethod setSystemProperty(key, value)​​​设置Java系统属性,如​​spark.executor.memory​​​,必须在实例化SparkContext之前调用它。
29.​​​show_profiles()​​​将配置文件状态打印到stdout
30.​​​sparkUser()​​​获取在运行SparkContext的用户
31.​​​property startTime​​​SparkContext开始的epoch time
32.​​​statusTracker()​​​返回statusTracker对象
33.​​​stop()​​​停止SparkContext
34.​​​textFile(name, minPartitions=None, use_unicode=True)​​从HDFS、本地文件系统(在所有节点上可用)或任何hadoop支持的文件系统URI读取文本文件,并将其作为字符串的RDD返回。如果use_unicode为False,则字符串将保留为str(编码为utf-8),这比unicode更快、更小。(在Spark 1.2中添加)

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello world!']

35.​​property uiWebUrl​​​返回这个SparkContext启动的SparkUI实例的URL
36.​​​union(rdds)​​构建RDDs列表的unions()。这支持不同序列化格式的RDDs的union(),尽管这迫使它们使用默认的序列化器重新序列化。

>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
... _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello']
>>> parallelized = sc.parallelize(["World!"])
>>> sorted(sc.union([textFile, parallelized]).collect())
['Hello', 'World!']

37.​​property version​​​返回应用运行的spark版本
38.​​​wholeTextFiles(path, minPartitions=None, use_unicode=True)​​​ 从HDFS、本地文件系统(在所有节点上可用)或任何hadoop支持的文件系统URI中读取文本文件目录。每个文件作为一条记录读取,并以键-值对的形式返回,其中键是每个文件的路径,值是每个文件的内容。
如果use_unicode为False,则字符串将保留为str(编码为utf-8),这比unicode更快、更小。(在Spark 1.2中添加)
例如,如果你有以下文件:

hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn

运行​​rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”)​​,则rdd包含

(a-hdfs-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)

注意:最好是小文件,因为每个文件都将在内存中完全加载。

SparkFiles

解析通过​​L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}​​​添加的文件的路径。
SparkFiles只包含类方法;用户不应该创建SparkFiles实例。
​​​classmethod get(filename)​​​ 获取通过​​SparkContext.addFile()​​添加的文件的绝对路径。
​classmethod getRootDirectory()​​ 获取包含通过​​SparkContext.addFile()​​添加的文件的根目录。


举报

相关推荐

0 条评论