0
点赞
收藏
分享

微信扫一扫

SparkStreaming常见案例

updateStateByKey算子

需求:统计到目前为止累积出现的单词的个数(需要保持住以前的状态)



java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().





需求:将统计结果写入到MySQL

create table wordcount(

word varchar(50) default null,

wordcount int(10) default null

);



通过该sql将统计结果写入到MySQL

insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"



存在的问题:

1) 对于已有的数据做更新,而是所有的数据均为insert

改进思路:

a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert

b) 工作中:HBase/Redis



2) 每个rdd的partition创建connection,建议大家改成连接池









window:定时的进行一个时间段内的数据处理



window length : 窗口的长度

sliding interval: 窗口的间隔



这2个参数和我们的batch size有关系:倍数



每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc

==> 每隔sliding interval统计前window length的值













黑名单过滤



访问日志 ==> DStream

20180808,zs

20180808,ls

20180808,ww

==> (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)



黑名单列表 ==> RDD

zs

ls

==>(zs: true)(ls: true)







==> 20180808,ww



leftjoin

(zs: [<20180808,zs>, <true>]) x

(ls: [<20180808,ls>, <true>]) x

(ww: [<20180808,ww>, <false>]) ==> tuple 1

举报

相关推荐

0 条评论