文章目录
- 写入到Mysql
- 写入parquet文件
- 写入文本文件
写入到Mysql
val df7_1 = spark.createDataFrame(List(
("Alice", "Female", "20"),
("Tom", "Male", "25"),
("Boris", "Male", "18"))).toDF("name", "sex", "age")
val properties = new java.util.Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
import org.apache.spark.sql.SaveMode
df7_1.write.mode(SaveMode.Append).jdbc("jdbc:mysql://linux01:3306/syllabus", "t_user", properties)
数据写入有四种模式
源码
def mode(saveMode: String): DataFrameWriter[T] = {
this.mode = saveMode.toLowerCase(Locale.ROOT) match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
"Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
}
this
}
overwrite 覆写模式: 存在表将表和删除,重写入数据,不存在创建
append 追加模式存在追加,不存在创建写入
ignore:不存在,创建写入,存在忽略当前操作
errorifexists:表存在,抛出异常
写入parquet文件
val df7_2 = spark.createDataFrame(List(
("Alice", "Female", "20"),
("Tom", "Male", "25"),
("Boris", "Male", "18"))).toDF("name", "sex", "age")
df7_2.repartition(1).write.format("parquet").save("hdfs://linux01:8020/spark/chapter7/data/parquet")
spark加载和输出数据默认支持parquet
写入文本文件
val df7_3 = spark.createDataFrame(List(
("Alice", "Female", "20"),
("Tom", "Male", "25"),
("Boris", "Male", "18"))).toDF("name", "sex", "age")
df7_3.repartition(1).write.json("hdfs://linux01:8020/spark/chapter7/data/json")
df7_3.repartition(1).write.csv("hdfs://linux01:8020/spark/chapter7/data/csv")