0
点赞
收藏
分享

微信扫一扫

Spark中RDD的常用操作(python)


Spark中RDD的常用操作(python)

转换操作
行动操作

除以下操作外,对RDD还存在一些常见数据操作如:

name()返回rdd的名称

min()返回rdd中的最小值

sum()叠加rdd中所有元素

take(n)取rdd中前n个元素

count()返回rdd的元素个数

# -*- coding:utf-8 -*-
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import math
appName ="jhl_spark_1" #你的应用程序名称
master= "local"#设置单机
conf = SparkConf().setAppName(appName).setMaster(master)#配置SparkContext
sc = SparkContext(conf=conf)

# parallelize:并行化数据,转化为RDD
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, numSlices=10) # numSlices为分块数目,根据集群数进行分块

# textFile读取外部数据
rdd = sc.textFile("./c2.txt") # 以行为单位读取外部文件,并转化为RDD
print rdd.collect()

# map:迭代,对数据集中数据进行单独操作
def my_add(l):
return (l,l)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data) # 并行化数据集
result = distData.map(my_add)
print (result.collect()) # 返回一个分布数据集


# filter:过滤数据
def my_add(l):
result = False
if l > 2:
result = True
return result
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)#并行化数据集,分片
result = distData.filter(my_add)
print (result.collect())#返回一个分布数据集

# zip:将两个RDD对应元素组合为元组
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
print x.zip(y).collect()





#union 组合两个RDD
print x.union(x).collect()
# Aciton操作

# collect:返回RDD中的数据
rdd = sc.parallelize(range(1, 10))
print rdd
print rdd.collect()

# collectAsMap:以rdd元素为元组,以元组中一个元素作为索引返回RDD中的数据
m = sc.parallelize([('a', 2), (3, 4)]).collectAsMap()
print m['a']
print m[3]

# groupby函数:根据提供的方法为RDD分组:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
def fun(i):
return i % 2
result = rdd.groupBy(fun).collect()
print [(x, sorted(y)) for (x, y) in result]

# reduce:对数据集进行运算
rdd = sc.parallelize(range(1, 10))
result = rdd.reduce(lambda a, b: a + b)
print

常见的转化操作和行动操作

Spark中RDD的常用操作(python)_spark


Spark中RDD的常用操作(python)_spark_02


Spark中RDD的常用操作(python)_数据_03


Spark中RDD的常用操作(python)_数据_04

map() 将函数应用于 RDD 中的每个元
素,将返回值构成新的 RDD
rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() 将函数应用于 RDD 中的每个元
素,将返回的迭代器的所有内
容构成新的 RDD。通常用来切
分单词
rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() 返回一个由通过传给 filter()
的函数的元素组成的 RDD
rdd.filter(x => x != 1) {2, 3, 3}
distinct() 去重 rdd.distinct() {1, 2, 3}
sample(withRe
placement, fra
ction, [seed])



union() 生成一个包含两个 RDD 中所有元
素的 RDD
rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() 求两个 RDD 共同的元素的 RDD rdd.intersection(other) {3}
subtract() 移除一个 RDD 中的内容(例如移
除训练数据)
rdd.subtract(other) {1, 2}
cartesian() 与另一个 RDD 的笛卡儿积 rdd.cartesian(other) {(1, 3), (1, 4), ...
(3, 5)}



collect() 返回 RDD 中的所有元素 rdd.collect() {1, 2, 3, 3}
count() RDD 中的元素个数 rdd.count() 4
countByValue() 各元素在 RDD 中出现的次数 rdd.countByValue() {(1, 1),
(2, 1),
(3, 2)}
take(num) RDD 中返回 num 个元素 rdd.take(2) {1, 2}
top(num) RDD 中返回最前面的 num
个元素
rdd.top(2) {3, 3}
takeOrdered(num)
(ordering)
RDD 中按照提供的顺序返
回最前面的 num 个元素
rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(withReplace
ment, num, [seed])
RDD 中返回任意一些元素 rdd.takeSample(false, 1) 非确定的
reduce(func) RDD
(例如 sum
rdd.reduce((x, y) => x + y) 9
fold(zero)(func) reduce() 样,
提供初始值
rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)
(seqOp, combOp)
reduce() 似,
返回不同类型的函数
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
(9,4)
foreach(func) RDD 中的每个元素使用给
定的函数
rdd.foreach(func)


举报

相关推荐

0 条评论