inputDF.withColumn("row_number",
row_number().over(Window.partitionBy("the_id").orderBy("the_id")) - 1)
.withColumn("bucket", col("row_number") / 10)
.rdd.groupBy(row => (row.getAs[String]("the_id") + "---" + row.getAs[Long]("bucket")))
.repartition(10000)
.map(pair => {
其实等于还是只能是 二次groupby 的方法