0
点赞
收藏
分享

微信扫一扫

大数据学习笔记-------------------(5)

第5章 Spark调度与高级编程

5.1 Spark应用程序例子

         Spark应用程序用spark-submit这个shell命令把spark应用程序部署在集群上。通过统一的接口使用各自的集群管理器。因此,不必每一个应用程序进行配置。

         例如:使用与之前相同的例子,这次使用spark应用来操作这个例子。

         下面是in.txt文件包含的文本信息:


people are not asbeautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care asthey share.

    下面是用scala编写的spark应用程序:SparkWordCount.scala

         

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count=input.flatMap(line=>line.split(" "))
.map(word=>(word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}


         使用如下步骤提交应用。在终端,按照下面的步骤执行spark-application路径下。

         Step 1:下载Spark Jar

      Spark corejar被用于编译,下载spark-core_2.10-1.3.0.jar的网址如下:

         ​​ http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.3.0​​

         把jar文件移动到spark-application路径下。

         Step 2:编译

         按照如下命令编译程序,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar是Spark 库中的hadoopjar。

$scalac -classpath"spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar"SparkPi.scala

         Step 3:创建Jar

         执行下面命令,创建一个wordcount.jar文件:


jar-cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

    Step 4:提交Spark应用程序

         提交spark应用的命令如下:

spark-submit --class SparkWordCount --master localwordcount.jar

    如果执行成功,将看到如下输出:

15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!



    Step 5:检查输出文件

         执行成功之后,在spark-application路径下会发现outfile文件夹。执行下面的命令,查看outfile文件夹下的文件:

        

$ cd outfile
$ls
Part-00000part-00001 _SUCCESS

    使用cat命令显示Part-00000 part-00001文件中的内容

5.2 Spark-submit语法

     spark-submit[options] <app jar | python file> [app arguments]

    options选项如下表:




    Option



Description



--master



spark://host:port, mesos://host:port, yarn, or local.



--deploy-mode



Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).



--class



Your application's main class (for Java / Scala apps).



--name



A name of your application.



--jars



Comma-separated list of local jars to include on the driver and executor classpaths.



--packages



Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths.



--repositories



Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages.



--py-files



Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps.



--files



Comma-separated list of files to be placed in the working directory of each executor.



--conf (prop=val)



Arbitrary Spark configuration property.



--properties-file



Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.



--driver-memory



Memory for driver (e.g. 1000M, 2G) (Default: 512M).



--driver-java-options



Extra Java options to pass to the driver.



--driver-library-path



Extra library path entries to pass to the driver.



--driver-class-path



Extra class path entries to pass to the driver.

Note that jars added with --jars are automatically included in the classpath.



--executor-memory



Memory per executor (e.g. 1000M, 2G) (Default: 1G).



--proxy-user



User to impersonate when submitting the application.



--help, -h



Show this help message and exit.



--verbose, -v



Print additional debug output.



--version



Print the version of current Spark.



--driver-cores NUM



Cores for driver (Default: 1).



--supervise



If given, restarts the driver on failure.



--kill



If given, kills the driver specified.



--status



If given, requests the status of the driver specified.



--total-executor-cores



Total cores for all executors.



--executor-cores



Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode).



 

5.3 Spark变量

         Spark包含两种不同的变量:broadcast变量Accumulators变量。

1.        broadcast变量:用于分发largevalues

         broadcast变量允许程序员保持一个只读变量缓存每台机器上,而不是shiping一个copy任务。它们可以被使用,例如,以有效的方式给每一个节点,一个大型输入数据集的copy。Spark也尝试使用高效broadcast算法以降低通信成本来分发broadcast变量。spark actions是通过阶段执行,把分布式shuffle操作分开。Spark自动broadcast每个阶段内任务所需要的公共数据。这种broadcast数据以序列化的形式被缓存,而且在运行的每个任务之前,反序列化。这意味着显式创建broadcast变量,只能用与如下情形:多个阶段的工作需要相同的数据、反序列化形式缓存数据。

         Broadcast变量通过调用SparkContext.broadcast(v)创建。broadcast变量被封装在v中,它的值可以调用value方法来使用。给出例子如下:

         scala> val broadcastVar =sc.broadcast(Array(1, 2, 3))

    输出:

    broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]] =        Broadcast(0)

         创建broadcast变量之后,它应该被用来代替在集群上运行的任何函数的值,这        样, broadcast变量不止一次的ship到节点。此外,broadcast变量对象不应被修饰在broadcast之后,以确保所有节点获得broadcast变量相同的值。       

2.        Accumulators变量:用于汇集特定集合的信息

         Accumulators是唯一一个通过关联操作被added而且支持高并发的变量。它们可以被用来实现计数器(如在MapReduce的)或求和。Spark本地支持数字类型的accumulators,并且程序员可以增加对新类型的支持。如果创建一个带有名字的accumulators,它们将在Spark UI显示。这对运行阶段的进度的理解是很有用(注:Python尚未支持)。

         accumulator通过调用SparkContext.accumulator(v)创建变量v,在集群上运行的任务可以通过add方法或者+=操作符(scala or python)加入accmulator。然而,它们的值是不能读取的。只有driver程序可以通过value方法读取accmulator的值。

         下面给出使用accumulator对数值中的元素进行求和:

         scala> val accum = sc.accumulator(0)

    scala>sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

    通过如下命令查看,accumulator中的值:scala> accum.value

    输出结果:res2: Int = 10

5.4 数字类型RDD操作

         Spark允许对numericdata进行不同的操作。Spark 数字类型操作的实现可以通过创建一个一次一个元素的流算法模型实现。这些操作被计算而且通过调用status()方法返回一个StatusCounter对象。StatusCounts对象的数字类型的函数列表如下:




     Method



&Meaning



 



count()



Number of elements in the RDD.



 



Mean()



Average of the elements in the RDD.



 



Sum()



Total value of the elements in the RDD.



 



Max()



Maximum value among all elements in the RDD.



 



Min()



Minimum value among all elements in the RDD.



Variance()



Variance of the elements.



Stdev()



Standard deviation.



         如果想使用这些方法中的一个,可以在RDD直接调用相应的方法。

 


举报

相关推荐

0 条评论