0
点赞
收藏
分享

微信扫一扫

Flink分区策略

霍华德 2022-03-12 阅读 101

分区策略决定了一条数据如何发给下游,Flink中提供八大分区策略。

RescalePartitioners注释:

 

代码demo:

 

package Flink_API;

import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;

public class TestPartitiner {
    public static void main(String[] args) throws Exception {
        //创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //Flink是以数据自带的时间戳字段为准
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //设置并行度
        env.setParallelism(1);
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.severs", "page01:9001");
        consumerProperties.setProperty("grop.id", "browsegroup");
        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(), consumerProperties));

        DataStream<UserBrowseLog> processData = dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {
            @Override
            public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {
                try {
                    UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                    if (browseLog != null) {
                        collector.collect(browseLog);
                    }
                } catch (Exception e) {
                    System.out.print("解析Json——UserBrowseLog异常:" + e.getMessage());
                }
            }
        }).setParallelism(2).name("processData");
        //上游2个算子,下游10个算子
//        //1、采用Global分区策略重分区
//        processData.global().print().setParallelism(10).name("print");
//        //2、采用SHUFFLE分区策略重分区(随机的)
//        processData.shuffle().print().setParallelism(10).name("print");
//        //3、采用rebalance分区策略重分区(默认的轮训分区器)
//        processData.rebalance().print().setParallelism(10).name("print");
//        //4、采用rescale分区策略重分区(默认的轮训分区器)
//        processData.rescale().print().setParallelism(10).name("print");
//        //5、采用broadcast分区策略重分区(默认的轮训分区器)
//        processData.broadcast().print().setParallelism(10).name("print");
//        //6、forward
//        processData.forward().print().setParallelism(10).name("print");
//        //7、hash
//        processData.keyBy("userID").print().setParallelism(10).name("print");
        //8、custom
        processData.partitionCustom(new CustomPartitioner(),"userID").print().setParallelism(10).name("print");
        //打印结果
        processData.print();

        //程序的入口类
        env.execute("TestPartitiner");

    }
    public static class CustomPartitioner implements Partitioner<String> {

        @Override
        public int partition(String s, int i) {
            System.out.print(i);
            if(s.equals("user_1")){
                return 0;
            }else{
                return 1;
            }
        }
    }

    //浏览类
    public static class UserBrowseLog implements Serializable {
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Integer productPrice;

        public String getUserID() {
            return userID;
        }

        public void setUserID(String userID) {
            this.userID = userID;
        }

        public String getEventTime() {
            return eventTime;
        }

        public void setEventTime(String eventTime) {
            this.eventTime = eventTime;
        }

        public String getEventType() {
            return eventType;
        }

        public void setEventType(String eventType) {
            this.eventType = eventType;
        }

        public String getProductID() {
            return productID;
        }

        public void setProductID(String productID) {
            this.productID = productID;
        }

        public Integer getProductPrice() {
            return productPrice;
        }

        public void setProductPrice(Integer productPrice) {
            this.productPrice = productPrice;
        }

        @Override
        public String toString() {
            return "UserBrowseLog{" +
                    "userID='" + userID + '\'' +
                    ", eventTime='" + eventTime + '\'' +
                    ", eventType='" + eventType + '\'' +
                    ", productID='" + productID + '\'' +
                    ", productPrice=" + productPrice +
                    '}';
        }
    }


}
举报

相关推荐

0 条评论