0
点赞
收藏
分享

微信扫一扫

大数据硬核技能进阶:Spark3实战智能物业运营系统

参考链接

Spark编程: Spark SQL基本操作 2020.11.01_df.agg("age"->"avg")-CSDN博客

RDD编程初级实践-CSDN博客

Spark和Hadoop的安装-CSDN博客

1. Spark SQL基本操作

{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }

创建employee.json文件

sudo vim employee.json
cat employee.json

启动spark-shell

cd /usr/local/spark/
./bin/spark-shell

1.1  查询所有数据

import spark.implicits._
val df=spark.read.json("file:home/hadoop/下载/employee.json")
df.show()

import spark.implicits._是Spark的一个工具,帮助 我们将RDD 转换为DataFrame。

spark.read.json是 Apache Spark 中的一个方法,用于从 JSON 文件中读取数据并将其加载到 DataFrame 中。

df.show()用于显示DataFrame中的内容。

1.2  查询所有数据,并去除重复的数据

df.distinct().show()

 distinct()去重。

 

1.3  查询所有数据,打印时去除id字段

df.drop(df("id")).show()

 df.drop()用于删除DataFrame中指定的列。

 

1.4  筛选出age>30的记录

df.filter(df("age")>30).show()

 df.filter()用于根据指定条件过滤DataFrame中的行。

1.5  将数据按age分组

df.groupBy(df("age")).count.show()

df.groupBy()用于根据指定的列对DataFrame进行分组。

df.count().show()用于显示分组后的DataFrame的内容。

1.6  将数据按name升序排列

df.sort(df("name").asc).show()

df.sort()用于对DataFrame中的行进行排序(默认升序)。

升序asc

降序desc

这里“Ella”比“Bob”小是因为“Ella”字符串实际上是“ Ella”,所以他的第一个字符不是‘E’而是‘ ’,对应的ASCII,‘E’是69,‘B’是66,‘ ’是32.

 

1.7  取出前3行数据

df.show(3)

 df.show(n)用于显示DataFrame的前n行。(n超出后会打印原始的大小)

1.8  查询所有记录的name列,并为其取别名为username

df.select(df("name").as("username")).show()

 df.select()用于选择DataFrame中指定的列。

 

1.9  查询年龄age的平均值

df.agg("age"->"avg").show()

 df.agg()用于对DataFrame进行聚合操作。

avg平均。

1.10 查询年龄age的最小值

df.agg("age"->"min").show()

min最小。 

2.编程实现将RDD转换为DataFrame

1,Ella,36
2,Bob,29
3,Jack,29

创建项目

sudo mkdir -p /example/sparkapp6/src/main/scala
cd /example/sparkapp6/src/main/scala

  创建employee.txt

sudo vim employee.txt 
cat employee.txt

创建Scala文件

sudo vim ./SimpleApp.scala

 

 创建.sbt文件

sudo vim build.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.13"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"

 打包执行

/usr/local/sbt/sbt package
spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar

3.编程实现利用DataFrame读写MySQL的数据

3.1 在MySQL数据库中新建数据库sparktest,再创建表employee

3.2 配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入

举报

相关推荐

0 条评论