0
点赞
收藏
分享

微信扫一扫

如何在Spark中使用动态数据转置

千白莫 2023-06-08 阅读 66


Dynamic Transpose是Spark中的一个关键转换,因为它需要大量的迭代。本文将为您提供有关如何使用内存中运算符处理此复杂方案的清晰概念。

首先,让我们看看我们拥有的源数据: 

idoc_number,订单ID,idoc_qualifier_org,idoc_org

7738,2364,6,0

7738,2364,7,0

7738,2364,8,mystr1

7738,2364,12,mystr2

7739,2365,12,mystr3

7739,2365,7,mystr4

我们还有idoc_qualifier_org 源数据记录中列的查找表  。由于查找表的大小会更小,我们可以预期它会在缓存中和驱动程序内存中。

预选赛,降序

6,司

7,分销渠道

8,销售组织

12,订单类型

Dynamic Transpose操作的预期输出是:

idoc_number,order_id,Division,Distribution Channel,Sales org,Order Type

7738,2364,0,0,mystr1,mystr2

7739,2365,空,mystr3,空,mystr4

以下代码实际上将根据数据中的当前列转置数据。此代码是使用Spark中的Transpose Data的另一种方法。此代码严格使用Spark的复杂数据类型,并且还负责迭代的性能。 

对象 DynamicTranspose {

def  dataValidator(map_val:Seq [ Map [ String,String ]],rule:String):String  = {

尝试 {

val  rule_array  =  规则。拆分(“#!”)。toList

val  src_map  =  map_val。toList。压扁。toMap

var  output_str  =  “”

rule_array。foreach(f  =>

output_str  =  output_str  +  “!”  +  src_map。getOrElse(f,“#”)


 


return  output_str。掉落(1)

} catch {

案例 t:

Throwable  =>  t。printStackTrace()。toString()

返回 “0”。toString()

}


 


}


 


def  main(args:Array [ String ]):Unit  = {


 


val  spark  =  SparkSession。builder()。主人(“本地[*]”)。config(“spark.sql.warehouse.dir”,“<src dir>”)。getOrCreate()

val  data_df  =  spark。读。选项(“标题”,“真”)。csv(“<data path src>”)

val  lkp_df  =  spark。读。选项(“标题”,“真”)。csv(“查找路径源>”)

进口 火花。暗示。_

进口 组织。阿帕奇。火花。sql。功能。广播


 


val  lkp_df_brdcast  =  broadcast(lkp_df)

val  result_df  =  data_df。加入(广播(lkp_df_brdcast),$  “idoc_qualifier_org”  ===  $  “限定符”,“内部”)


 


val  df1  =  result_df。groupBy(col(“idoc_number”),col(“orderid”))。agg(collect_list(map($  “desc”,$  “idoc_org”))as  “map”)

进口 组织。阿帕奇。火花。sql。功能。UDF

进口 组织。阿帕奇。火花。sql。功能。{

点燃,

最大,

ROW_NUMBER

}

进口 火花。暗示。_

进口 组织。阿帕奇。火花。sql。行

val  map_val  =  lkp_df。rdd。地图(行 =>  行。的getString(1))。收集()。mkString(“#!”)

火花。sparkContext。广播(map_val)

VAL  recdValidator  =  UDF(dataValidator  _)

var  latest_df  =  df1。withColumn(“explode_out”,split(recdValidator(df1(“map”),lit(map_val)),“!”))。掉落(“地图”)

val  columns  =  map_val。拆分(“#!”)。toList

latest_df  =  列。zipWithIndex。foldLeft(latest_df){

(memodDF,专栏)=> {

memodDF。withColumn(柱。_1,山口(“explode_out” )(柱。_2))

}

}

。drop(“explode_out”)

latest_df。show()

}


 


}

希望这可以帮助!

举报

相关推荐

0 条评论