文章目录
- 读取jdbc数据源
读取jdbc数据源
package cn.edu360.day7
import java.util.Properties
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* Created by zx on 2017/5/13.
*/
object JdbcDataSource {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("JdbcDataSource")
.master("local[*]")
.getOrCreate()
import spark.implicits._
//load这个方法会读取真正mysql的数据吗?
val logs: DataFrame = spark.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/bigdata",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "logs",
"user" -> "root",
"password" -> "123568")
).load()
//logs.printSchema()
//logs.show()
// val filtered: Dataset[Row] = logs.filter(r => {
// r.getAs[Int]("age") <= 13
// })
// filtered.show()
//lambda表达式
val r = logs.filter($"age" <= 13)
//val r = logs.where($"age" <= 13)
val reslut: DataFrame = r.select($"id", $"name", $"age" * 10 as "age")
//val props = new Properties()
//props.put("user","root")
//props.put("password","123568")
//reslut.write.mode("ignore").jdbc("jdbc:mysql://localhost:3306/bigdata", "logs1", props)
//DataFrame保存成text时出错(只能保存一列)
//reslut.write.text("/Users/zx/Desktop/text")
//reslut.write.json("/Users/zx/Desktop/json")
//reslut.write.csv("/Users/zx/Desktop/csv")
//reslut.write.parquet("hdfs://node-4:9000/parquet")
//reslut.show()
spark.close()
}
}
reslut.write.mode(“ignore”) : mode类型有四种
overwrite:覆盖原数据
append:追加原数据
ignore:不做任何操作往里面写数据
error:里面有数据报错
parquet和csv没什么可说的
df.toDF(“列名1”,“列名2”) : 可以修改列名称