1.需要的依赖
使用JDBCAppendTableSink方法创建mysqlsink写入,数据是追加到数据库的
//创建流处理执行环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
environment.setParallelism(2);
//加载自定义数据形成数据流
DataStreamSource<Person> personSource = environment.addSource(new DataDB());
//转换为Row类型才能写入mysql
DataStream<Row> personRow = personSource.map(new MapFunction<Person, Row>() {
@Override
public Row map(Person person) throws Exception {
Row row = new Row(4);
row.setField(0, person.getPid());
row.setField(1, person.getPname());
row.setField(2, person.getPsex());
row.setField(3, person.getPage());
return row;
}
});
//定义调用JDBCAppendTableSink创建mysqlSink
JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
//驱动程序
.setDrivername("com.mysql.cj.jdbc.Driver")
//连接URL地址
.setDBUrl("jdbc:mysql://localhost:3306/school?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC")
//用户名
.setUsername("root")
//密码
.setPassword("1234")
//写入类型
.setParameterTypes(BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO)
//写入语句
.setQuery("insert into person values (?,?,?,?);")
//一次批量写入条数
.setBatchSize(10)
.build();
//调用mysqlsink写入mysql数据
tableSink.emitDataStream(personRow);
//执行程序
environment.execute();
如果出现以下报错,
需要在url地址里面添加如下代码