前言
操作如下, 这个探讨主要是 基于开发环境, 自己开发相关代码的情况下
主要的操作是如下, HelloFlink 项目中 Test01WordCount 修改了 drvier 的代码, 但是并没有重新打包 HelloFlink-0.0.1.jar
然后 在 idea 中右键执行 Test01WordCount 之后产生的差异
测试用例
/**
* com.hx.test.Test01WordCount
*
* @author Jerry.X.He
* @version 1.0
* @date 2021/4/12 10:14
*/
public class Test01WordCount {
// com.hx.test.Test01WordCount
// -Xmx100M -XX:+UseSerialGC -XX:+TraceClassUnloading
public static void main(String[] args) throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String jarPath = "D:\\IdeaWorkStations\\HelloFlink\\target\\HelloFlink-0.0.1.jar";
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081, jarPath);
env.setParallelism(1);
String inputPath = "D:\\IdeaWorkStations\\HelloFlink\\src\\main\\resources\\Test01WordCount.txt";
DataSource<String> inputDs = env.readTextFile(inputPath);
DataSet<Tuple2<String, Integer>> result = inputDs
.flatMap(new MyFlatMapMapper())
.map(new MyMapMapper())
.groupBy(0)
.sum(1);
result.print();
System.gc();
System.in.read();
System.out.println(" end ");
}
/**
* MyFlatMapMapper
*
* @author Jerry.X.He
* @version 1.0
* @date 2021/4/12 10:24
*/
private static class MyFlatMapMapper implements FlatMapFunction<String, String> {
private static List<byte[]> dummyBytes = new ArrayList<>();
static {
try {
for (int i = 0; i < 10; i++) {
byte[] tmpBytes = FileUtils.readAllBytes(Paths.get("D:\\IdeaWorkStations\\HelloFlink\\target\\logs\\ROOT.2021-12-27-9.log"));
dummyBytes.add(tmpBytes);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
new Thread(new MyRunnable()).start();
}
}
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] splits = line.split("\\s+");
for (String split : splits) {
out.collect(split);
}
}
}
/**
* MyRunnable
*
* @author Jerry.X.He
* @return
* @date 2021/12/27 16:16
*/
public static class MyRunnable implements Runnable {
@Override
public void run() {
System.err.println(" MyRunnable.run before ");
IoUtils.sleep(1000_000);
System.err.println(" MyRunnable.run after ");
}
}
/**
* MyMapMapper
*
* @author Jerry.X.He
* @version 1.0
* @date 2021/4/12 10:29
*/
private static class MyMapMapper implements MapFunction<String, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<>(word, 1);
}
}
}
MyFlatMapper.flatMap 中调整代码
增加一行 “Thread.sleep(3_000)” 之后, 右键执行 Test01WordCount
你会发现 加的这一行代码 根本没有用
结合上面 “处理函数的传递流程” 你应该可以知道, 在 Driver 到 JobManager 到 TaskExecutor 这个过程中, 传递了两部分东西, 一个是 JobGraph, 是客户端这边根据驱动代码上下文生成的, 一部分是 “HelloFlink-0.0.1.jar”
TaskConfiguration中传递的是序列化之后的 MyFlatMapMapper 的实例, 而不是具体的代码, 然后具体的结合 HelloFlink-0.0.1.jar 来加载类型 MyFlatMapMapper, 反序列化 MyFlatMapMapper, 所以执行的 flatMap 方法, 实际上时 HelloFlink-0.0.1.jar 中的 MyFlatMapMapper.flatMap 还是未加 “Thread.sleep(3_000)” 之前的代码
解决的方式, 就是重新打包 “HelloFlink-0.0.1.jar” 即可
又或者 是 driver 这边和 flink 这边使用同一套代码, 如下方式 即可
java -classpath "D:\IdeaWorkStations\HelloFlink\target\HelloFlink-0.0.1.jar;" com.hx.test.Test01WordCount
MyFlatMapper 中增加字段
调整如下, 在 MyFlatMapMapper 中增加字段 “x”, 然后右键执行 Test01WordCount
然后 会出现反序列化的报错
报错信息如下
因为反序列化是在具体的 TaskExecutor 这边反序列化的, 因此这个错误信息的传递流程是 TaskExecutor 传递到 JobMaster 然后传递到 Driver 这边
具体的错误信息是 JobGraph 中序列化的 MyFlatMapMapper 对象 和 TaskExecutor 从 HelloFlink-0.0.1.jar 中 MyFlatMapMapper 类型不兼容, 反序列化出现了问题
解决方式是, Driver 这边的类型 和 HelloFlink-0.0.1.jar 中的类型保持一致 即可, 方式 和上面的处理一样
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: fb0f2f5ca43429f3b400c338f2610466)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at com.hx.test.Test01WordCount.main(Test01WordCount.java:43)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 10 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.io.InvalidClassException: com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more
com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451
这个就取决于具体的 ObjectStreamClass. computeDefaultSUID的计算方式了
如果在不指定 “private static final long serialVersionUID = 42L;” 的情况下, 计算一个 Class 的 serialVersionUID 方式为 ObjectStreamClass. computeDefaultSUID
ObjectStreamClass. computeDefaultSUID 的实现大致如下, 具体可以参考 对应的代码
它是和 类型名称, 类型修饰符, 实现的额接口名称
完