0
点赞
收藏
分享

微信扫一扫

【Spark】广播变量和累加器


文章目录

  • ​​一、Spark广播变量​​
  • ​​二、累加器​​
  • ​​Reference​​

一、Spark广播变量

【Spark】广播变量和累加器_spark

多进程编程中,不同进程可以通过创建共享内存,进行进程间通信。而在分布式中,Spark通过【广播变量】和【累加器】进行共享变量。

【Spark】广播变量和累加器_spark_02

【栗子】广播变量

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("RDD Demo") \
.getOrCreate();
sc = spark.sparkContext
#############################################
conf = {"ip":"192.168.1.1","key":"cumt"}
#广播变量
brVar = sc.broadcast(conf)
#获取广播变量值
a = brVar.value

#{'ip': '192.168.1.1', 'key': 'cumt'}
print(a)

#cumt
print(a["key"])

#更新广播变量
brVar.unpersist()
conf["key"] = "jackwang"

#再次广播
brVar = sc.broadcast(conf)

#获取广播新变量值
a = brVar.value

#{'ip': '192.168.1.1', 'key': 'jackwang'}
print(a)

#destroy()可将广播变量的数据和元数据一同销毁,销毁后不能使用

brVar.destroy()
##############################################
sc.stop()

分析:
(1)上面通过​​​SparkContext.broadcast(conf)​​​将普通变量​​conf​​​创建成广播变量(一个包装变量,这时候该广播变量就能在集群中的其他节点进行共享数值了),我们可通过​​value​​​方法(即​​brVar.value​​),在各个节点访问其值,即引用广播变量的数值。

(2)如果需要改变广播变量​​brVar​​​的值,需要先使用​​brVar.unpersist()​​,然后修改数值后再次广播,就能够被集群的其他节点获取数值。

二、累加器

累加器只能利用关联操作做【加】操作。累加器能在调试时对作业的执行过程的相关事件进行计数。

这里也贴一个累加器的栗子:

import findspark
findspark.init()
##############################################
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("RDD Demo") \
.getOrCreate();
sc = spark.sparkContext

#############################################
rdd = sc.range(1,101)

#创建累加器,初始值0
acc = sc.accumulator(0)
def fcounter(x):
global acc
if x % 2 == 0 :
acc += 1
#unsupported operand type(s) for -=
#acc -= 1
rdd_counter = rdd.map(fcounter)
#获取累加器值
#0
print(acc.value)

#保证多次正确获取累加器值
rdd_counter.persist()

#100
print(rdd_counter.count())

#50
print(acc.value)

#100
print(rdd_counter.count())
#50
print(acc.value)
##############################################
sc.stop()

分析:
(1)上面首先创建了一个元素个数为100的RDD对象,后面在该RDD对象上执行一个​​​map​​​操作,​​map​​​的函数体是用​​global acc​​​引入一个全局累加器​​acc​​​(即一开始创建的​​sc.accumulator(0)​​的acc对象,而不是局部变量)。

(2)注意:要保证多次正确获得累加器值,需要先执行​​rdd_counter.;persist()​​​。如最后一组​​rdd_counter.count()​​​执行时,触发​​fcounter​​​函数,累加器会再次执行,变为50+50=100,但是一开始的​​rdd_counter.persisit()​​​切断了action的链条,导致只执行一次,所以​​acc.value​​还是50。

(3)使用累加器时,为了保证准确性,只能使用一次动作操作;如果需要使用多次动作操作,则在RDD对象上执行​​cache​​​或者​​persisit​​操作来切断依赖。

Reference

[1] Pyspark大数据实战
[2] ​​Spark中的共享变量(广播变量和累加器)​​ [3] 网络广播的实现原理
[4] spark广播变量的原理
[5] spark广播变量


举报

相关推荐

0 条评论