分区策略决定了一条数据如何发给下游,Flink中提供八大分区策略。
RescalePartitioners注释:
代码demo:
package Flink_API;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.Properties;
public class TestPartitiner {
public static void main(String[] args) throws Exception {
//创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Flink是以数据自带的时间戳字段为准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置并行度
env.setParallelism(1);
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.severs", "page01:9001");
consumerProperties.setProperty("grop.id", "browsegroup");
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(), consumerProperties));
DataStream<UserBrowseLog> processData = dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {
@Override
public void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {
try {
UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
if (browseLog != null) {
collector.collect(browseLog);
}
} catch (Exception e) {
System.out.print("解析Json——UserBrowseLog异常:" + e.getMessage());
}
}
}).setParallelism(2).name("processData");
//上游2个算子,下游10个算子
// //1、采用Global分区策略重分区
// processData.global().print().setParallelism(10).name("print");
// //2、采用SHUFFLE分区策略重分区(随机的)
// processData.shuffle().print().setParallelism(10).name("print");
// //3、采用rebalance分区策略重分区(默认的轮训分区器)
// processData.rebalance().print().setParallelism(10).name("print");
// //4、采用rescale分区策略重分区(默认的轮训分区器)
// processData.rescale().print().setParallelism(10).name("print");
// //5、采用broadcast分区策略重分区(默认的轮训分区器)
// processData.broadcast().print().setParallelism(10).name("print");
// //6、forward
// processData.forward().print().setParallelism(10).name("print");
// //7、hash
// processData.keyBy("userID").print().setParallelism(10).name("print");
//8、custom
processData.partitionCustom(new CustomPartitioner(),"userID").print().setParallelism(10).name("print");
//打印结果
processData.print();
//程序的入口类
env.execute("TestPartitiner");
}
public static class CustomPartitioner implements Partitioner<String> {
@Override
public int partition(String s, int i) {
System.out.print(i);
if(s.equals("user_1")){
return 0;
}else{
return 1;
}
}
}
//浏览类
public static class UserBrowseLog implements Serializable {
private String userID;
private String eventTime;
private String eventType;
private String productID;
private Integer productPrice;
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getEventTime() {
return eventTime;
}
public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public String getProductID() {
return productID;
}
public void setProductID(String productID) {
this.productID = productID;
}
public Integer getProductPrice() {
return productPrice;
}
public void setProductPrice(Integer productPrice) {
this.productPrice = productPrice;
}
@Override
public String toString() {
return "UserBrowseLog{" +
"userID='" + userID + '\'' +
", eventTime='" + eventTime + '\'' +
", eventType='" + eventType + '\'' +
", productID='" + productID + '\'' +
", productPrice=" + productPrice +
'}';
}
}
}