输出到MySQL(JDBC)
写入数据的MySQL的测试步骤如下。
 (1)添加依赖
 添加MySQL驱动:
 mysql
 mysql-connector-java
 8.0.27
 官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:
 apache-snapshots
 apache snapshots
 https://repository.apache.org/content/repositories/snapshots/
 添加依赖:
 org.apache.flink
 flink-connector-jdbc
 1.17-SNAPSHOT
 如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:
 aliyunmaven
 *,!apache-snapshots
 阿里云公共仓库
 https://maven.aliyun.com/repository/public
 (2)启动MySQL,在test库下建表ws
mysql>     
CREATE TABLE `ws` (
  `id` varchar(100) NOT NULL,
  `ts` bigint(20) DEFAULT NULL,
  `vc` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8(3)编写输出到MySQL的示例代码
public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());
        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 重试次数
                        .withBatchSize(100) // 批次的大小:条数
                        .withBatchIntervalMs(3000) // 批次的时间
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("000000")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );
        sensorDS.addSink(jdbcSink);
        env.execute();
    }
}(4) 运行代码,用客户端连接MySQL,查看是否成功写入数据。                








