0
点赞
收藏
分享

微信扫一扫

Sparksql的2.x版本dataFrame和dataSet

月孛星君 2022-02-17 阅读 90


package sql2

import org.apache.spark.sql.SparkSession

object Spark2Join {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("joinTest")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val lines = spark.createDataset(List("1,laozhoa,china", "2,laoduan,usa", "3,laoyang,jp"))

//对数据进行整理
val tpDs = lines.map(line => {
val fields = line.split(",")
val id = fields(0).toLong
val name = fields(1)
val nationCode = fields(2)
(id, name, nationCode)
})
val df1 = tpDs.toDF("id", "name", "nation")

val nations = spark.createDataset(List("china,中国", "usa,美国"))

//对数据进行整理
val ndataset = nations.map(l => {
val fields = l.split(",")
val ename = fields(0)
val cname = fields(1)
(ename, cname)
})
val df2 = ndataset.toDF("ename","cname")

/*
第一种基于dataFrame创建视图的方式,通过写sql方式将两者相结合
*/
df1.createTempView("v_users")
df2.createTempView("v_nations")

val rs = spark.sql("select name,cname from v_users left join v_nations on nation = ename")
rs.show()
/*
第二种方式: 基于dataset,默认是innerjoin
*/
df1.join(df2,$"nation" === $"ename","left_outer").show()

spark.stop()

}
}



举报

相关推荐

0 条评论