0
点赞
收藏
分享

微信扫一扫

Spark:shuffle key中包含rand问题

生态人 04-16 12:30 阅读 2

在shuffle的key中包含rand,一般有两种使用场景:

1、distribute by rand,将数据随机打散

2、rand出现在join的on条件中,避免数据倾斜

负面影响:如果Spark的shuffle条件中包含rand,rand的shuffle阶段发生fetch fail,有可能引起数据错误。

因此,在处理数据倾斜时将热点key打散需要注意:尽量不要在join时,对关联key使用rand()函数。因为在hive中当遇到map失败重算时,就会出现数据重复(数据丢失)的问题,spark引擎使用rand容易导致task失败重新计算的时候偶发不一致的问题。 可以使用md5加密唯一维度值的方式替代rand(), 比如: md5(concat(coalesce(sku_id, 0), ‘’, coalesce(dim_store_num, 0), ‘’, coalesce(store_id, 0), ‘_’,coalesce(delv_center_id, 0))),其中concat的字段是表的唯一粒度;也可以使用hash。

修改SQL解析

出现问题的原因是:作业在进行的shuffle的时候,同一行数据,shuffle的结果不是幂等的。如果shuffle的mapper task由于失败重算,就有可能导致shuffle的数据分配错误。

修改的目标就是:

  1. 对于同一行数据,需要shuffle的结果是幂等的。
  2. 具体的方式:把shuffle的key与数据中确定的列绑定。

比如原本是想通过rand随机分散到20个分区里面, distribute by cast(rand(11)*20 as int) 可以修改成

distribute by rand

1、用一个整数的id对20取模
distribute by id % 20
2、 用任意一个类型字段的hash,然后对20取模
distribute by abs(hash(col) % 20)
3、避免某个字段倾斜,多考虑几个字段,降低倾斜的概率
distribute by abs(hash(col1, col2, …) % 20)

举报

相关推荐

0 条评论