0
点赞
收藏
分享

微信扫一扫

spark window 的使用


1. window 用在rank 中的使用

看这样一个需求,求出每个销售人员的按照销售金额大小的orderid

package com.waitingfy

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


object WindowTest {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName("WindowTest")
.getOrCreate()
import spark.implicits._

val orders = Seq(
("1", "s1", "2017-05-01", 100),
("2", "s1", "2017-05-02", 200),
("3", "s1", "2017-05-02", 200),
("4", "s2", "2017-05-01", 300),
("5", "s2", "2017-05-01", 100),
("6", "s3", "2017-05-01", 100),
("6", "s3", "2017-05-02", 50)
).toDF("order_id", "seller_id", "order_date", "price")


val rankSpec = Window.partitionBy("seller_id").orderBy(orders("price").desc)

val shopOrderRank =
orders.withColumn("rank", dense_rank.over(rankSpec))

shopOrderRank.show()

spark.close()
}
}

+--------+---------+----------+-----+----+ |order_id|seller_id|order_date|price|rank| +--------+---------+----------+-----+----+ | 4| s2|2017-05-01| 300| 1| | 5| s2|2017-05-01| 100| 2| | 6| s3|2017-05-01| 100| 1| | 6| s3|2017-05-02| 50| 2| | 2| s1|2017-05-02| 200| 1| | 3| s1|2017-05-02| 200| 1| | 1| s1|2017-05-01| 100| 2| +--------+---------+----------+-----+----+

注意下这边用了dense_rank,所以金额一样的,排名一样,而且下一个排名不会出现断层
如果用rank,结果如下:

+--------+---------+----------+-----+----+ |order_id|seller_id|order_date|price|rank| +--------+---------+----------+-----+----+ | 4| s2|2017-05-01| 300| 1| | 5| s2|2017-05-01| 100| 2| | 6| s3|2017-05-01| 100| 1| | 6| s3|2017-05-02| 50| 2| | 2| s1|2017-05-02| 200| 1| | 3| s1|2017-05-02| 200| 1| | 1| s1|2017-05-01| 100| 3| +--------+---------+----------+-----+----+

如果用row_number 结果如下:

+--------+---------+----------+-----+----+ |order_id|seller_id|order_date|price|rank| +--------+---------+----------+-----+----+ | 4| s2|2017-05-01| 300| 1| | 5| s2|2017-05-01| 100| 2| | 6| s3|2017-05-01| 100| 1| | 6| s3|2017-05-02| 50| 2| | 2| s1|2017-05-02| 200| 1| | 3| s1|2017-05-02| 200| 2| | 1| s1|2017-05-01| 100| 3| +--------+---------+----------+-----+----+

2. 求移动平均,或者移动和类似rowsBetween的使用

现在需求是求一个营业员的最近2个订单的平均成交价格

package com.waitingfy

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


object WindowTest {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[2]")
.appName("WindowTest")
.getOrCreate()
import spark.implicits._

val orders = Seq(
("1", "s1", "2017-05-01", 100),
("2", "s1", "2017-05-02", 200),
("3", "s1", "2017-05-02", 200),
("4", "s2", "2017-05-01", 300),
("5", "s2", "2017-05-01", 100),
("6", "s3", "2017-05-01", 100),
("6", "s3", "2017-05-02", 50)
).toDF("order_id", "seller_id", "order_date", "price")


val rankSpec = Window.partitionBy("seller_id").orderBy(orders("order_date")).rowsBetween(-1, 0)

// val shopOrderRank =
// orders.withColumn("rank", row_number.over(rankSpec))

val shopOrderRank =
orders.withColumn("avg sum", avg("price").over(rankSpec))

shopOrderRank.show()

spark.close()
}
}

注意rowsBetween的使用。

 

+--------+---------+----------+-----+-------+ |order_id|seller_id|order_date|price|avg sum| +--------+---------+----------+-----+-------+ | 4| s2|2017-05-01| 300| 300.0| | 5| s2|2017-05-01| 100| 200.0| | 6| s3|2017-05-01| 100| 100.0| | 6| s3|2017-05-02| 50| 75.0| | 1| s1|2017-05-01| 100| 100.0| | 2| s1|2017-05-02| 200| 150.0| | 3| s1|2017-05-02| 200| 200.0| +--------+---------+----------+-----+-------+


举报

相关推荐

0 条评论