0
点赞
收藏
分享

微信扫一扫

通过实例学习 PySpark

通过实例学习 PySpark

最近学习了一下 PySpark, 目标是在工作中能将其用上. 在实践过程中发现, 通过一个个具体的问题来进行学习, 很多内容掌握起来更为容易. 因此后面如果写相关的文章, 也会采用实例的方式来介绍.

下面要解决的问题是:

假设用户购买商品, 其点击时间记录在点击表中, 其下单时间记录在下单表中, 另外还有一张表记录用户的特征. 现在的目标是, 获取每个用户从点击时间到下单时间的时间间隔, 并和特征进行拼接. 比如用户 A:

用户    点击时间     下单时间      时间间隔    特征
user1 2020-05-13 2020-05-14 24*3600s sex:male age:28

了解问题的目标之后, 首先是获取原始数据.

原始数据获取

## data.py
click_time = [
['user1', '2020-05-13 10:46:43'],
['user2', '2020-05-22 08:26:42'],
['user3', '2020-05-17 02:42:31'],
['user4', '2020-05-23 18:25:23'],
['user5', '2020-05-19 13:29:05'],
['user6', '2020-05-16 19:48:23'],
['user7', '2020-05-20 16:56:13'],
]

order_time = [
['user3', '2020-05-18 10:46:43'],
['user1', '2020-05-22 08:26:42'],
['user5', '2020-05-27 02:42:31'],
['user7', '2020-05-23 18:25:23'],
['user4', '2020-05-29 13:29:05'],
['user2', '2020-05-26 19:48:23'],
['user6', '2020-05-20 16:56:13'],
]

features = [
['age:26', 'weight:70', 'sex:male', 'id:user2'],
['weight:50', 'age:22', 'sex:female', 'id:user1'],
['weight:70', 'sex:male'],
['age:16', 'weight:63', 'sex:male', 'id:user7'],
['age:22', 'sex:male'],
['age:33', 'weight:72', 'sex:female', 'id:user5'],
['weight:63', 'age:46', 'sex:female', 'id:user4'],
['age:45', 'weight:73'],
]

with open('click_time.txt', 'w') as f:
content = '\n'.join(['\t'.join(item) for item in click_time])
f.write('{}\n'.format(content))

with open('order_time.txt', 'w') as f:
content = '\n'.join(['\t'.join(item) for item in order_time])
f.write('{}\n'.format(content))

with open('features.txt', 'w') as f:
content = '\n'.join(['\t'.join(item) for item in features])
f.write('{}\n'.format(content))

SparkSession

下一步, 创建 SparkSession.

from pyspark.sql import SparkSession, Row

spark = SparkSession.builder \
.appName('test') \
.master('local') \
.enableHiveSupport() \
.getOrCreate()

加载数据

数据文件生成后, 现在使用 PySpark 读入文件生成 DataFrame. 目前我发现有三种读入文件的方法.

加载点击时间表

  1. 使用 HiveQL 语句读入文件:

click_table = 'click_table'
click_file = 'click_time.txt'
spark.sql("""
create table if not exists `{}` (
id STRING,
click_time STRING
)
using hive options (fileFormat 'textfile', fieldDelim '\t')
""".format(click_table))
spark.sql("""
load data local inpath '{click_file}' into table `{click_table}`
""".format(click_file=click_file, click_table=click_table))
click_df = spark.sql("select * from `{}`".format(click_table))

其中 ​​using hive options (fileFormat 'textfile', fieldDelim '\t')​​​ 也可以用 ​​row format delimited fields terminated by '\t'​​ 替换.

  1. 使用​​spark.read​​ 读入文件:

click_file = 'click_time.txt'
df = spark.read.text(click_file).toDF('info')

  1. 使用​​sc.textFile​​ 读入文件:

click_file = 'click_time.txt'
sc = spark.sparkContext
df = sc.textFile(click_file).map(lambda x: Row(info=x)).toDF()

需要注意的是, 使用第 2 以及第 3 种方法, 为了获得 ​​id​​​ 以及 ​​click_time​​ 两个 field, 还需要额外的处理:

def time_split(row):
id, time = row.split('\t')
return (id, time)

click_df = spark.createDataFrame(
df.rdd.map(lambda x: time_split(x[0])),
['id', 'click_time']
)

注意 ​​df​​​ 是 ​​DataFrame​​​ 对象, 使用 ​​.rdd​​​ 转换为 RDD 对象, 之后使用 ​​.map​​​ 方法处理 ​​RDD​​​ 中的每个 ​​Row​​​ 对象, 在 ​​How to get a value from the Row object in Spark Dataframe?
​​ 中谈到, ​​Row​​​ 继承于 ​​namedtuple​​​, 因此代码中的 ​​x[0]​​​ (通过索引访问) 含义是取出 Row 中的值, 当然, 可以使用 ​​x.info​​ (通过 field 访问) 获取 Row 中的值.

生成 ​​click_df​​ 后, 可以显示部分数据看看是否符合预期:

click_df.limit(3).show()
## 或者
click_df.show(3)

效果如下:


通过实例学习 PySpark_python

注意: 后面为了数据处理的一致性, 我一律采用第二种方法来读入数据.

加载下单时间表

基本和加载点击时间表逻辑相同.

order_file = 'order_time.txt'
df = spark.read.text(order_file).toDF('info')
order_df = spark.createDataFrame(
df.rdd.map(lambda x: time_split(x[0])),
['id', 'order_time']
)

加载 user 特征表

注意对特征表的过滤, 因为有些记录存在内容缺失, 比如找不到 ​​id​​​ 或者 ​​sex​​.

def feature_split(row):
row = row.split('\t')
feature_dict = {item.split(':')[0]: item.split(':')[1] for item in row}
return feature_dict

df = spark.read.text('features.txt').toDF('info')
feature_df = spark.createDataFrame(
df.rdd.map(lambda x: feature_split(x[0])),
['id', 'sex', 'age', 'weight']
)
feature_df = feature_df.filter((feature_df.id.isNotNull()) & \
(feature_df.sex.isNotNull())
)

schema

注意在加载 user 特征表时, 对于 ​​sex​​​, ​​age​​​, ​​weight​​​ 之类的特征, 没有指定它们的类型, 可能默认就是字符串类型了. 为了显式指定对应的类型, 需要自定义 schema, 参考: ​​pyspark: ValueError: Some of types cannot be determined after inferring​​​. (我原来遇到过一个错误: ​​ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling​​ 也可以通过自定义 Schema 解决).

代码如下:

在 ​​StructField​​​ 中的第三个参数含义为: ​​Boolean nullable​​​, 即是否可以被设置为 ​​null​​​. (具体参见: ​​spark.sql.types.StructField​​)

之所以下面的 schema 中全部设置为 ​​StringType​​​, 是因为我在 ​​feature_split​​​ 函数中将结果均表示成字符串的形式, 比如 ​​age​​​ 中的结果不是 int 而是字符串. 如果希望设置 ​​age​​​ 为 ​​IntegerType​​​, 那么应该修改 ​​feature_split​​​ 中的代码, 将 ​​age​​​ 对应的结果用 ​​int()​​ 方法做转换.

from pyspark.sql.types import (StructType, 
StructField,
StringType,
IntegerType,
DoubleType,
)

df = spark.read.text('features.txt').toDF('info')
schema = StructType([StructField("id", StringType(), True),
StructField("sex", StringType(), True),
StructField("age", StringType(), True),
StructField("weight", StringType(), True),
])
feature_df = spark.createDataFrame(
df.rdd.map(lambda x: feature_split(x[0])),
schema=schema
)
feature_df = feature_df.filter((feature_df.id.isNotNull()) & \
(feature_df.sex.isNotNull()) & \
(feature_df.age.isNotNull()) & \
(feature_df.weight.isNotNull())
)

获取下单和点击的时间间隔

为了获取下单和点击的时间间隔, 需要将点击时间表和下单时间表进行 Left Outer Join, 以获取每个 user 对应的点击时间以及下单时间. 然而, 由于 user 可能只进行点击而未下单, 因此要对结果过滤.

join_df = click_df.join(order_df, click_df.id == order_df.id, how='left') \
.select(click_df.id, click_df.click_time, order_df.order_time) \
.filter(order_df.order_time.isNotNull())

上面代码获取了每个 user 对应的点击时间以及下单时间, 为了获得时间间隔, 需要额外的函数进行处理:

from datetime import datetime

def convert2datetime(s, format='%Y-%m-%d %H:%M:%S'):
return datetime.strptime(s, format)

def convert2str(s, format='%Y-%m-%d %H:%M:%S'):
return datetime.strftime(s, format)

def convert(row):
id, click_time, order_time = row
click_time, order_time = list(map(convert2datetime, [click_time, order_time]))
diff = (order_time - click_time).total_seconds()
return (id, diff)

join_df = spark.createDataFrame(join_df.rdd.map(convert), ['id', 'diff'])

Join 特征表

最后只需要 Join 特征表就能达到我们最终的目的:

final_df = join_df.join(feature_df, join_df.id == feature_df.id, how='inner') \
.select(join_df.id, feature_df.sex, feature_df.age, feature_df.weight)

可以考虑使用 ​​.show()​​ 输出结果看看是否符合预期. 此外, 如果想将结果保存在目录中, 可以使用如下方式完成:

def create(row):
row = map(str, row)
line = '\t'.join(row)
return line

output_dir = 'output'
final_df.rdd.map(create).repartition(2).saveAsTextFile(output_dir)

另外注意, 如果 ​​output_dir​​ 已经存在, 需要提前删除, 否则程序会报错.

观察 ​​output​​ 的文件:


通过实例学习 PySpark_pyspark_02

可以发现结果保存在两个分区中, 比如 ​​part-00001​​ 中保存着:


通过实例学习 PySpark_sql_03

终曲 & 尾声

不要忘记

spark.stop()

完整代码

以上完整代码如下, 运行起来, 去感受 Spark 的强大 ????????????

(发表完博客后补充: 文章在草稿中保存了几天, 发出来后发现, 结果好像跟一开始设置的目标不太一样啊 ???????????? 忘了把时间间隔加到结果中了, 不过这无伤大雅~ 果然写博客还是得一气呵成! )

from datetime import datetime
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import (StructType,
StructField,
StringType,
IntegerType,
DoubleType,
)

def time_split(row):
id, time = row.split('\t')
return (id, time)

def feature_split(row):
row = row.split('\t')
feature_dict = {item.split(':')[0]: item.split(':')[1] for item in row}
return feature_dict

def convert2datetime(s, format='%Y-%m-%d %H:%M:%S'):
return datetime.strptime(s, format)

def convert2str(s, format='%Y-%m-%d %H:%M:%S'):
return datetime.strftime(s, format)

def convert(row):
id, click_time, order_time = row
click_time, order_time = list(map(convert2datetime, [click_time, order_time]))
diff = (order_time - click_time).total_seconds()
return (id, diff)

def create(row):
row = map(str, row)
line = '\t'.join(row)
return line

spark = SparkSession.builder \
.appName('test') \
.master('local') \
.enableHiveSupport() \
.getOrCreate()

click_file = 'click_time.txt'
df = spark.read.text(click_file).toDF('info')
click_df = spark.createDataFrame(
df.rdd.map(lambda x: time_split(x[0])),
['id', 'click_time']
)

click_df.show(3)

order_file = 'order_time.txt'
df = spark.read.text(order_file).toDF('info')
order_df = spark.createDataFrame(
df.rdd.map(lambda x: time_split(x[0])),
['id', 'order_time']
)

df = spark.read.text('features.txt').toDF('info')
schema = StructType([StructField("id", StringType(), True),
StructField("sex", StringType(), True),
StructField("age", StringType(), True),
StructField("weight", StringType(), True),
])
feature_df = spark.createDataFrame(
df.rdd.map(lambda x: feature_split(x[0])),
schema=schema
)
feature_df = feature_df.filter((feature_df.id.isNotNull()) & \
(feature_df.sex.isNotNull()) & \
(feature_df.age.isNotNull()) & \
(feature_df.weight.isNotNull())
)

join_df = click_df.join(order_df, click_df.id == order_df.id, how='left') \
.select(click_df.id, click_df.click_time, order_df.order_time) \
.filter(order_df.order_time.isNotNull())

join_df = spark.createDataFrame(join_df.rdd.map(convert), ['id', 'diff'])

feature_df.show(3)

final_df = join_df.join(feature_df, join_df.id == feature_df.id, how='inner') \
.select(join_df.id, feature_df.sex, feature_df.age, feature_df.weight)

output_dir = 'output'
final_df.rdd.map(create).repartition(2).saveAsTextFile(output_dir)

spark.stop()

举报

相关推荐

0 条评论