0
点赞
收藏
分享

微信扫一扫

Flink实时写入Mysql数据

萨摩斯加士奇 2022-01-27 阅读 158

1.需要的依赖

使用JDBCAppendTableSink方法创建mysqlsink写入,数据是追加到数据库的

 //创建流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        environment.setParallelism(2);
        //加载自定义数据形成数据流
        DataStreamSource<Person> personSource = environment.addSource(new DataDB());
        //转换为Row类型才能写入mysql
        DataStream<Row> personRow = personSource.map(new MapFunction<Person, Row>() {
            @Override
            public Row map(Person person) throws Exception {
                Row row = new Row(4);
                row.setField(0, person.getPid());
                row.setField(1, person.getPname());
                row.setField(2, person.getPsex());
                row.setField(3, person.getPage());
                return row;
            }
        });
        //定义调用JDBCAppendTableSink创建mysqlSink
        JDBCAppendTableSink tableSink = JDBCAppendTableSink.builder()
                //驱动程序
                .setDrivername("com.mysql.cj.jdbc.Driver")
                //连接URL地址
                .setDBUrl("jdbc:mysql://localhost:3306/school?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC")
                //用户名
                .setUsername("root")
                //密码
                .setPassword("1234")
                //写入类型
                .setParameterTypes(BasicTypeInfo.INT_TYPE_INFO,
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    BasicTypeInfo.INT_TYPE_INFO)
                //写入语句
                .setQuery("insert into person values (?,?,?,?);")
                //一次批量写入条数
                .setBatchSize(10)
                .build();
        //调用mysqlsink写入mysql数据
        tableSink.emitDataStream(personRow);
        //执行程序
        environment.execute();

如果出现以下报错,

需要在url地址里面添加如下代码

举报

相关推荐

0 条评论