if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').\
config('spark.sql.shuffle.partition','2').\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5,6,7]).map(lambda x:[x])
df = rdd.toDF(['num'])
# TODO1: sparkSession.udf.register()
# udf 处理函数
def num_ride_10(num):
return num * 10
# 参数1.注册udf的名称,这个udf名称仅仅可以用于sql风格
# 参数2. udf的处理逻辑,写一个单独的方法
# 参数3. udf的返回值类型;注意:udf注册时候,必须声明返回类型,并且udf的真是返回值类型一定要和声明的返回值类型一致
# 返回值对象:这是一个udf对象,仅可以用于DSL语法
# 当前这种方式定义的UDF,可以通过参数1的吗,名称用于SQL风格,通过返回值用于DSL风格
udf2 = spark.udf.register('udf1',num_ride_10,IntegerType())
# SQL风格使用
# selectExpr以select的表达式执行,表达式SQL风格的表达式(字符串)
# select 方法,接受普通的字符串字段名,或者返回是Column对象的计算
df.selectExpr("udf1(num)").show()
#DSL风格
# 返回值udf对象如果作为方法使用,传入的参数一定是Column对象
df.select(udf2(df['num'])).show()
# TODO2:仅仅用于DSL风格
udf3 = F.udf(num_ride_10,IntegerType())
df.select(udf3(df['num'])).show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').\
config('spark.sql.shuffle.partition','2').\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([['hadoop spark flink'],['hadoop flink java']])
df = rdd.toDF(['line'])
df.show()
def split_line(data):
return data.split(" ")
udf2 = spark.udf.register("udf1",split_line,ArrayType(StringType()))
# DSL 风格
df.select(udf2(df['line'])).show()
#SQL风格
df.createTempView('line')
spark.sql('select udf1(line) from line').show()
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').\
config('spark.sql.shuffle.partition','2').\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([[1],[2],[3]])
df = rdd.toDF(['num'])
def split_line(num):
return {'num':num,'letters':string.ascii_letters[num]}
udf1 = spark.udf.register('udf1',split_line,StructType().add('num',IntegerType(),nullable=True).\
add('letters',StringType(),nullable=True))
df.createTempView('line')
spark.sql(" select udf1(num) from line").show()
所以只能利用rdd算法去模拟udtf的实现
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').\
config('spark.sql.shuffle.partition','2').\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5],3)
df = rdd.map(lambda x:[x]).toDF(['num'])
single_partition_rdd = df.rdd.repartition(1)
def process(iter):
sum = 0
for row in iter:
sum += row['num']
return [sum]
print(single_partition_rdd.mapPartitions(process).collect())
ntile window函数用于将结果集分解为指定数量的近似相等的组或桶。ntile函数返回与每行相关联的bucket编号。整数的名称来源于将结果集划分为四分之一(四分位数)、十分之一(十分位数)等的实践。
if __name__ == '__main__':
spark = SparkSession.builder.appName('create df').master('local[*]').\
config('spark.sql.shuffle.partition','2').\
getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([ ('张三', 'class_1', 99), ('王五', 'class_2', 35), ('王三', 'class_3', 57), ('王久', 'class_4', 12), ('王丽', 'class_5', 99), ('王娟', 'class_1', 90), ('王军', 'class_2', 91), ('王俊', 'class_3', 33), ('王君', 'class_4', 55), ('王珺', 'class_5', 66), ('郑颖', 'class_1', 11), ('郑辉', 'class_2', 33), ('张丽', 'class_3', 36), ('张张', 'class_4', 79), ('黄凯', 'class_5', 90), ('黄开', 'class_1', 90), ('黄恺', 'class_2', 90), ('王凯', 'class_3', 11), ('王凯杰', 'class_1', 11), ('王开杰', 'class_2', 3), ('王景亮', 'class_3', 99)
])
schema = StructType().add('name',StringType()).add('class',StringType()).add('score',IntegerType())
df = rdd.toDF(schema)
df.createTempView('stu')
# 窗口函数只用于SQL风格, 所以注册表先 df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
select * , row_number() over(order by score desc) as row_number_rank,
dense_rank() over(partition by class order by score desc) as dense_rank,
rank() over(order by score desc) as rank
from stu
""").show()
spark.sql("""
select *,ntile(6) over(order by score desc) from stu
""").show()