0
点赞
收藏
分享

微信扫一扫

PysparkNote103---window滑窗

RJ_Hwang 2022-08-04 阅读 61


Intro

    window滑窗函数的使用,主要是应用在一些统计场景。举例:统计下面每个人每15分钟点击次数

数据构造

from pyspark.sql import SparkSession
from pyspark.sql import functions as

def get_or_create(app_name):
spark = SparkSession \
.builder \
.appName(app_name) \
.config("spark.driver.maxResultSize", "10g") \
.config("spark.sql.execution.arrow.enabled", "true") \
.config("spark.dynamicAllocation.enabled", "false") \
.config("spark.sql.crossJoin.enabled", "true") \
.config("spark.kryoserializer.buffer.max", "512m") \
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
return

import pandas as

df = pd.DataFrame({'name': ['A', 'A', 'A', 'B', 'B'], 'click': [1, 1, 1, 1, 1],
'click_time': ['2022-08-01 11:59:59', '2022-08-01 12:02:00', '2022-08-01 12:03:59',
'2022-08-01 12:00:10', '2022-08-01 12:02:10']})



name

click

click_time

0

A

1

2022-08-01 11:59:59

1

A

1

2022-08-01 12:02:00

2

A

1

2022-08-01 12:03:59

3

B

1

2022-08-01 12:00:10

4

B

1

2022-08-01 12:02:10

spark = get_or_create('spark')
df_spark = spark.createDataFrame(df)
df_spark.show(truncate=False)

D:\code\spark\python\pyspark\sql\session.py:714: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
warnings.warn(msg)


+----+-----+-------------------+
|name|click|click_time |
+----+-----+-------------------+
|A |1 |2022-08-01 11:59:59|
|A |1 |2022-08-01 12:02:00|
|A |1 |2022-08-01 12:03:59|
|B |1 |2022-08-01 12:00:10|
|B |1 |2022-08-01 12:02:10|
+----+-----+-------------------+

df_spark.printSchema()

root
|-- name: string (nullable = true)
|-- click: long (nullable = true)
|-- click_time: string (nullable = true)

滑窗统计

滑窗左闭右开
参数介绍:

  • timeColumn:The time column must be of :class:​​pyspark.sql.types.TimestampType​​ 滑窗统计的时间字段有类型要求
  • windowDuration:Durations are provided as strings, e.g. ‘1 second’, ‘1 day 12 hours’, ‘2 minutes’. Valid
    interval strings are ‘week’, ‘day’, ‘hour’, ‘minute’, ‘second’, ‘millisecond’, ‘microsecond’.窗宽
  • slideDuration:If the​​slideDuration​​ is not provided, the windows will be tumbling windows.滑窗长度,如果None就是滚动滑窗
  • startTime:The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
    window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
    past the hour, e.g. 12:15-13:15, 13:15-14:15… provide​​​startTime​​​ as​​15 minutes​​.

df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time'))).show()

+----+-----+-------------------+-------------------+
|name|click| click_time| click_timestamp|
+----+-----+-------------------+-------------------+
| A| 1|2022-08-01 11:59:59|2022-08-01 11:59:59|
| A| 1|2022-08-01 12:02:00|2022-08-01 12:02:00|
| A| 1|2022-08-01 12:03:59|2022-08-01 12:03:59|
| B| 1|2022-08-01 12:00:10|2022-08-01 12:00:10|
| B| 1|2022-08-01 12:02:10|2022-08-01 12:02:10|
+----+-----+-------------------+-------------------+

df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time'))).printSchema()

root
|-- name: string (nullable = true)
|-- click: long (nullable = true)
|-- click_time: string (nullable = true)
|-- click_timestamp: timestamp (nullable = true)

5分钟窗宽,每分钟滑一次

df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
.groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration='1 minute', startTime=None)])\
.agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
.withColumn('window_start',F.col('window.start'))\
.withColumn('window_end',F.col('window.end'))\
.orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)

+----+------------------------------------------+---+---+-------------------+-------------------+
|name|window |cnt|dnt|window_start |window_end |
+----+------------------------------------------+---+---+-------------------+-------------------+
|A |[2022-08-01 11:55:00, 2022-08-01 12:00:00]|1 |1 |2022-08-01 11:55:00|2022-08-01 12:00:00|
|A |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1 |1 |2022-08-01 11:56:00|2022-08-01 12:01:00|
|A |[2022-08-01 11:57:00, 2022-08-01 12:02:00]|1 |1 |2022-08-01 11:57:00|2022-08-01 12:02:00|
|A |[2022-08-01 11:58:00, 2022-08-01 12:03:00]|2 |1 |2022-08-01 11:58:00|2022-08-01 12:03:00|
|A |[2022-08-01 11:59:00, 2022-08-01 12:04:00]|3 |1 |2022-08-01 11:59:00|2022-08-01 12:04:00|
|A |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2 |1 |2022-08-01 12:00:00|2022-08-01 12:05:00|
|A |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|2 |1 |2022-08-01 12:01:00|2022-08-01 12:06:00|
|A |[2022-08-01 12:02:00, 2022-08-01 12:07:00]|2 |1 |2022-08-01 12:02:00|2022-08-01 12:07:00|
|A |[2022-08-01 12:03:00, 2022-08-01 12:08:00]|1 |1 |2022-08-01 12:03:00|2022-08-01 12:08:00|
|B |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1 |1 |2022-08-01 11:56:00|2022-08-01 12:01:00|
|B |[2022-08-01 11:57:00, 2022-08-01 12:02:00]|1 |1 |2022-08-01 11:57:00|2022-08-01 12:02:00|
|B |[2022-08-01 11:58:00, 2022-08-01 12:03:00]|2 |1 |2022-08-01 11:58:00|2022-08-01 12:03:00|
|B |[2022-08-01 11:59:00, 2022-08-01 12:04:00]|2 |1 |2022-08-01 11:59:00|2022-08-01 12:04:00|
|B |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2 |1 |2022-08-01 12:00:00|2022-08-01 12:05:00|
|B |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|1 |1 |2022-08-01 12:01:00|2022-08-01 12:06:00|
|B |[2022-08-01 12:02:00, 2022-08-01 12:07:00]|1 |1 |2022-08-01 12:02:00|2022-08-01 12:07:00|
+----+------------------------------------------+---+---+-------------------+-------------------+

5分钟窗宽,滚动滑窗

这里的滚动滑窗相当于滑动距离等于与窗宽的滑动滑窗

df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
.groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration='5 minute', startTime=None)])\
.agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
.withColumn('window_start',F.col('window.start'))\
.withColumn('window_end',F.col('window.end'))\
.orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)

+----+------------------------------------------+---+---+-------------------+-------------------+
|name|window |cnt|dnt|window_start |window_end |
+----+------------------------------------------+---+---+-------------------+-------------------+
|A |[2022-08-01 11:55:00, 2022-08-01 12:00:00]|1 |1 |2022-08-01 11:55:00|2022-08-01 12:00:00|
|A |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2 |1 |2022-08-01 12:00:00|2022-08-01 12:05:00|
|B |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2 |1 |2022-08-01 12:00:00|2022-08-01 12:05:00|
+----+------------------------------------------+---+---+-------------------+-------------------+

df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
.groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration=None, startTime=None)])\
.agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
.withColumn('window_start',F.col('window.start'))\
.withColumn('window_end',F.col('window.end'))\
.orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)

+----+------------------------------------------+---+---+-------------------+-------------------+
|name|window |cnt|dnt|window_start |window_end |
+----+------------------------------------------+---+---+-------------------+-------------------+
|A |[2022-08-01 11:55:00, 2022-08-01 12:00:00]|1 |1 |2022-08-01 11:55:00|2022-08-01 12:00:00|
|A |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2 |1 |2022-08-01 12:00:00|2022-08-01 12:05:00|
|B |[2022-08-01 12:00:00, 2022-08-01 12:05:00]|2 |1 |2022-08-01 12:00:00|2022-08-01 12:05:00|
+----+------------------------------------------+---+---+-------------------+-------------------+

startTime偏移量的使用

这里的窗宽是从0分0秒开始计算,如果头铁,起点从1分开始算,e.g.统计1-6,6-11这样的窗宽咋整

df_spark.withColumn('click_timestamp',F.to_timestamp(F.col('click_time')))\
.groupby(['name',F.window(timeColumn='click_timestamp', windowDuration='5 minute', slideDuration='5 minute', startTime='1 minutes')])\
.agg(F.count('click').alias('cnt'),F.countDistinct('click').alias('dnt'))\
.withColumn('window_start',F.col('window.start'))\
.withColumn('window_end',F.col('window.end'))\
.orderBy(F.col('name'),F.col('window_start').asc()).show(truncate=False)

+----+------------------------------------------+---+---+-------------------+-------------------+
|name|window |cnt|dnt|window_start |window_end |
+----+------------------------------------------+---+---+-------------------+-------------------+
|A |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1 |1 |2022-08-01 11:56:00|2022-08-01 12:01:00|
|A |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|2 |1 |2022-08-01 12:01:00|2022-08-01 12:06:00|
|B |[2022-08-01 11:56:00, 2022-08-01 12:01:00]|1 |1 |2022-08-01 11:56:00|2022-08-01 12:01:00|
|B |[2022-08-01 12:01:00, 2022-08-01 12:06:00]|1 |1 |2022-08-01 12:01:00|2022-08-01 12:06:00|
+----+------------------------------------------+---+---+-------------------+-------------------+

简单的用法介绍完毕

                                2022-08-03 于南京市江宁区九龙湖


举报

相关推荐

0 条评论