0
点赞
收藏
分享

微信扫一扫

Apache Spark 练习七:使用Spark分析音乐专辑数据


一、源数据

本章所分析的数据来自于Kaggle公开的化妆品电子商务数据(​​https://www.kaggle.com/datasets/nowingkim/ecommerce-data-cosmetics-shop​​)。该数据具体包括以下字段:

column name

description

time

Time when event happened at (in UTC).

event_name

4 kinds of value: purchase, cart, view, removefromcart

product_id

ID of a product

category_id

Product's category ID

category_name

Product's category taxonomy (code name) if it was possible to make it. Usually present for meaningful categories and skipped for different kinds of accessories.

brand

Downcased string of brand name.

price

Float price of a product.

user_id

Permanent user ID.

session

Temporary user's session ID. Same for each user's session. Is changed every time user come back to online store from a long pause.

category_1

Largest class of product included

category_2

Bigger class of product included

category_3

Smallest class of product included

在开始下面的练习前,将csv文件中的数据全部写入到Kafka的“E_Commerce”消息主题中。

二、练习题

0. 数据预处理

首先,我们对Kafka中的原始数据进行预处理,得到Structured Streaming Dateset。

val spark = SparkSession
.builder()
.appName("Online Shopping")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._

// 从Kafka中加载源数据
val schema = new StructType()
.add("time", "timestamp")
.add("event_name", "string")
.add("product_id", "string")
.add("category_id", "string")
.add("category_name", "string")
.add("brand", "string")
.add("price", "double")
.add("user_id", "string")
.add("session", "string")
.add("category_1", "string")
.add("category_2", "string")
.add("category_3", "string")
val df = spark.readStream
.format("kafka")
// Kafka配置:IP:localhost:9092;Topic:E_Commerce
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "E_Commerce")
// 为更好的进行模拟,设置从头开始消费数据
.option("startingOffsets", "earliest")
// 为模拟micro-batch的效果,设置每一批数据的最大规模为100000
.option("maxOffsetsPerTrigger", 100000)
.load()
// 从Kafka的消息中提取value
.selectExpr("CAST(value AS STRING)")
// 将按逗号分隔的数据切分为列
.select(
from_csv(
$"value",
schema,
Map("sep" -> ",", "timestampFormat" -> "yyyy-MM-dd HH:mm:ssZ")
).as("data")
)
.select("data.*")

1. 找出购买了价格超过100元的家具类商品的购买时间、商品ID和用户ID

val res = df
.where("price > 100 and category_1 = 'furniture'")
.select("time", "product_id", "user_id")

2. 统计每种品牌的累计销售金额

val res = df
.filter($"brand" =!= "Not defined" and $"event_name" === "purchase")
.groupBy($"brand")
.sum("price")

3. 找出实时交易金额最高的10个品牌名(不含Not defined)

val res = df
.withWatermark("time", "10 minute")
.filter($"event_name" === "purchase" and $"brand" =!= "Not defined")
.groupBy($"brand")
.sum("price")
.orderBy($"sum(price)".desc)
.limit(10)

4. 统计每小时的用户浏览量、用户购买量

val res = df
.withWatermark("time", "10 minutes")
.filter($"event_name" === "view" or $"event_name" === "purchase")
.groupBy(window($"time", "1 hour", "1 hour", "0 seconds"))
.agg(
sum(when($"event_name" === "view", 1).otherwise(0)).as("total_view"),
sum(when($"event_name" === "purchase", 1).otherwise(0)).as("total_purchase")
)

5. 统计每天的交易金额

val res = df
.withWatermark("time", "10 minutes")
.filter($"event_name" === "purchase")
.groupBy(window($"time", "1 day"))
.sum("price")

6. 找出每天浏览量最高的10款商品的商品ID

val res = df
.withWatermark("time", "10 minutes")
.filter($"event_name" === "view")
.groupBy(window($"time", "1 day"), $"product_id")
.count()
.orderBy($"window".desc, $"count".desc)
.limit(10)

7. 找出添加购物车后24小时内购买该商品的用户

val res = df
.withWatermark("time", "10 minutes")
.filter($"event_name" === "cart")
.as("cart")
.join(
df.withWatermark("time", "10 minutes")
.filter($"event_name" === "purchase")
.as("purchase"),
expr("""
|cart.user_id = purchase.user_id AND
|purchase.time >= cart.time AND
|purchase.time <= cart.time + interval 24 hours
|""".stripMargin),
joinType = "inner"
)
.select("cart.user_id")

8. 找出浏览了商品但在24小时内未购买它的用户

val res = df
.withWatermark("time", "10 minutes")
.filter($"event_name" === "view")
.as("view")
.join(
df.withWatermark("time", "10 minutes")
.filter($"event_name" === "purchase")
.as("purchase"),
expr("""
|view.user_id = purchase.user_id AND
|purchase.time >= view.time AND
|purchase.time <= view.time + interval 24 hours
|""".stripMargin),
joinType = "leftOuter"
)
.where("purchase.user_id IS NULL")
.select("view.user_id")

举报

相关推荐

0 条评论