Intro
常用hive字符串拼接函数,转json等操作
import
from pyspark.sql import SparkSession
# 创建SparkSession对象,调用.builder类
# .appName("testapp")方法给应用程序一个名字;.getOrCreate()方法创建或着获取一个已经创建的SparkSession
spark = SparkSession.builder.appName("pysaprk").getOrCreate()
import pyspark.sql.functions
df = spark.createDataFrame([(1, "male", "18", "2019-01-01 11:45:50"),
(2, "female", "37", "2019-01-02 11:55:50"),
(3, "male", "18", "2019-01-21 11:45:50"),
(4, "female", "44", "2019-02-01 12:45:50"),
(5, "male", "39", "2019-01-15 10:40:50")],
["id", "sex", "age", "createtime_str"])
df.show(20,truncate=False)
+---+------+---+-------------------+
|id |sex |age|createtime_str |
+---+------+---+-------------------+
|1 |male |18 |2019-01-01 11:45:50|
|2 |female|37 |2019-01-02 11:55:50|
|3 |male |18 |2019-01-21 11:45:50|
|4 |female|44 |2019-02-01 12:45:50|
|5 |male |39 |2019-01-15 10:40:50|
+---+------+---+-------------------+
df.createOrReplaceTempView("temp")
struct
结构体,转json的中间过程
df_struct = spark.sql(
"""
select named_struct("id",id) as col_struct
from temp
"""
)
df_struct.show()
+----------+
|col_struct|
+----------+
| [1]|
| [2]|
| [3]|
| [4]|
| [5]|
+----------+
df_struct.printSchema()
root
|-- col_struct: struct (nullable = false)
| |-- id: long (nullable = true)
转json
利用to_json把上面的结构体直接转成json字符串
df_json = spark.sql(
"""
select to_json(named_struct("id",id)) as json
from temp
"""
)
df_json.show()
+--------+
| json|
+--------+
|{"id":1}|
|{"id":2}|
|{"id":3}|
|{"id":4}|
|{"id":5}|
+--------+
df_json.printSchema()
root
|-- json: string (nullable = true)
concat
concat常规的字符串拼接函数,没啥,看个例子
spark.sql(
"""
select concat("id",":",id) as col_concat
from temp
"""
).show()
+----------+
|col_concat|
+----------+
| id:1|
| id:2|
| id:3|
| id:4|
| id:5|
+----------+
concat_ws
concat_ws(seperator, string s1, string s2…)
采取分隔符,把各个字段连接起来
spark.sql(
"""
select concat_ws(",",id,sex) as col_concat_ws,
concat(id,",",sex) as next
from temp
"""
).show()
+-------------+--------+
|col_concat_ws| next|
+-------------+--------+
| 1,male| 1,male|
| 2,female|2,female|
| 3,male| 3,male|
| 4,female|4,female|
| 5,male| 5,male|
+-------------+--------+
collect_list和collect_set
和group by连用,把分组中某一列转成一个数组。collect_list不去重,collect_set去重
df_collect = spark.sql(
"""
select sex
,collect_set(age) as set_col
,collect_list(age) as list_age
from temp
group by sex
"""
)
df_collect.show()
+------+--------+------------+
| sex| set_col| list_age|
+------+--------+------------+
|female|[44, 37]| [37, 44]|
| male|[39, 18]|[18, 18, 39]|
+------+--------+------------+
df_collect.printSchema()
root
|-- sex: string (nullable = true)
|-- set_col: array (nullable = true)
| |-- element: string (containsNull = true)
|-- list_age: array (nullable = true)
| |-- element: string (containsNull = true)
可以看到collect之后以array格式储存
collect_list和concat_ws数组转字符串
spark.sql(
"""
select sex
,collect_list(age) as list_age
,concat_ws(",",collect_list(age)) as age_str
from temp
group by sex
""").show(truncate=False)
+------+------------+--------+
|sex |list_age |age_str |
+------+------------+--------+
|female|[37, 44] |37,44 |
|male |[18, 18, 39]|18,18,39|
+------+------------+--------+
利用collect_list拼接嵌套json
df_json2 = spark.sql(
"""
select sex
,to_json(named_struct("age_list",list_age)) as json_str1
,to_json(named_struct("sex",sex,"data",to_json(named_struct("age_list",list_age)) )) as json_str2
from
(select sex
,collect_list(age) as list_age
,concat_ws(",",collect_list(age)) as age_str
from temp
group by sex ) as t
""")
df_json2.show(truncate=False)
+------+-----------------------------+-------------------------------------------------------------+
|sex |json_str1 |json_str2 |
+------+-----------------------------+-------------------------------------------------------------+
|female|{"age_list":["37","44"]} |{"sex":"female","data":"{\"age_list\":[\"37\",\"44\"]}"} |
|male |{"age_list":["18","18","39"]}|{"sex":"male","data":"{\"age_list\":[\"18\",\"18\",\"39\"]}"}|
+------+-----------------------------+-------------------------------------------------------------+
df_json2.printSchema()
root
|-- sex: string (nullable = true)
|-- json_str1: string (nullable = true)
|-- json_str2: string (nullable = true)
2022-02-16 于南京市江宁区九龙湖