0
点赞
收藏
分享

微信扫一扫

07 开发环境的代码 和 jar代码不一致的情况产生的问题


前言

操作如下, 这个探讨主要是 基于开发环境, 自己开发相关代码的情况下 

主要的操作是如下, HelloFlink 项目中 Test01WordCount 修改了 drvier 的代码, 但是并没有重新打包 HelloFlink-0.0.1.jar

然后 在 idea 中右键执行 Test01WordCount 之后产生的差异 

 

 

测试用例

/**
 * com.hx.test.Test01WordCount
 *
 * @author Jerry.X.He
 * @version 1.0
 * @date 2021/4/12 10:14
 */
public class Test01WordCount {

    // com.hx.test.Test01WordCount
    // -Xmx100M -XX:+UseSerialGC -XX:+TraceClassUnloading
    public static void main(String[] args) throws Exception {

//        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String jarPath = "D:\\IdeaWorkStations\\HelloFlink\\target\\HelloFlink-0.0.1.jar";
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("127.0.0.1", 8081, jarPath);
        env.setParallelism(1);

        String inputPath = "D:\\IdeaWorkStations\\HelloFlink\\src\\main\\resources\\Test01WordCount.txt";
        DataSource<String> inputDs = env.readTextFile(inputPath);

        DataSet<Tuple2<String, Integer>> result = inputDs
                .flatMap(new MyFlatMapMapper())
                .map(new MyMapMapper())
                .groupBy(0)
                .sum(1);
        result.print();

        System.gc();
        System.in.read();
        System.out.println(" end ");

    }

    /**
     * MyFlatMapMapper
     *
     * @author Jerry.X.He
     * @version 1.0
     * @date 2021/4/12 10:24
     */
    private static class MyFlatMapMapper implements FlatMapFunction<String, String> {
        private static List<byte[]> dummyBytes = new ArrayList<>();

        static {
            try {
                for (int i = 0; i < 10; i++) {
                    byte[] tmpBytes = FileUtils.readAllBytes(Paths.get("D:\\IdeaWorkStations\\HelloFlink\\target\\logs\\ROOT.2021-12-27-9.log"));
                    dummyBytes.add(tmpBytes);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                new Thread(new MyRunnable()).start();
            }
        }

        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            String[] splits = line.split("\\s+");
            for (String split : splits) {
                out.collect(split);
            }
        }
    }

    /**
     * MyRunnable
     *
     * @author Jerry.X.He
     * @return
     * @date 2021/12/27 16:16
     */
    public static class MyRunnable implements Runnable {
        @Override
        public void run() {
            System.err.println(" MyRunnable.run before ");
            IoUtils.sleep(1000_000);
            System.err.println(" MyRunnable.run after ");
        }
    }

    /**
     * MyMapMapper
     *
     * @author Jerry.X.He
     * @version 1.0
     * @date 2021/4/12 10:29
     */
    private static class MyMapMapper implements MapFunction<String, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(String word) throws Exception {
            return new Tuple2<>(word, 1);
        }
    }

}

 

 

MyFlatMapper.flatMap 中调整代码

增加一行 “Thread.sleep(3_000)” 之后, 右键执行 Test01WordCount

你会发现 加的这一行代码 根本没有用

结合上面 “处理函数的传递流程” 你应该可以知道, 在 Driver 到 JobManager 到 TaskExecutor 这个过程中, 传递了两部分东西, 一个是 JobGraph, 是客户端这边根据驱动代码上下文生成的, 一部分是 “HelloFlink-0.0.1.jar”

TaskConfiguration中传递的是序列化之后的 MyFlatMapMapper 的实例, 而不是具体的代码, 然后具体的结合 HelloFlink-0.0.1.jar 来加载类型 MyFlatMapMapper, 反序列化 MyFlatMapMapper, 所以执行的 flatMap 方法, 实际上时 HelloFlink-0.0.1.jar 中的 MyFlatMapMapper.flatMap 还是未加 “Thread.sleep(3_000)” 之前的代码 

解决的方式, 就是重新打包 “HelloFlink-0.0.1.jar” 即可 

07 开发环境的代码 和 jar代码不一致的情况产生的问题_flink

 

又或者 是 driver 这边和 flink 这边使用同一套代码, 如下方式 即可

java -classpath "D:\IdeaWorkStations\HelloFlink\target\HelloFlink-0.0.1.jar;" com.hx.test.Test01WordCount

 

 

MyFlatMapper 中增加字段

调整如下, 在 MyFlatMapMapper 中增加字段 “x”, 然后右键执行 Test01WordCount

然后 会出现反序列化的报错

07 开发环境的代码 和 jar代码不一致的情况产生的问题_jar_02

 

报错信息如下 

因为反序列化是在具体的 TaskExecutor 这边反序列化的, 因此这个错误信息的传递流程是 TaskExecutor 传递到 JobMaster 然后传递到 Driver 这边 

具体的错误信息是 JobGraph 中序列化的 MyFlatMapMapper 对象 和 TaskExecutor 从 HelloFlink-0.0.1.jar 中 MyFlatMapMapper 类型不兼容, 反序列化出现了问题 

解决方式是, Driver 这边的类型 和 HelloFlink-0.0.1.jar 中的类型保持一致 即可, 方式 和上面的处理一样

Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: fb0f2f5ca43429f3b400c338f2610466)
	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
	at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
	at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
	at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
	at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
	at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
	at com.hx.test.Test01WordCount.main(Test01WordCount.java:43)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
	... 10 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451
	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
	at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
	at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
	at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
	at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
	... 3 more
Caused by: java.io.InvalidClassException: com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
	at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
	... 9 more

 

 

com.hx.test.Test01WordCount$MyFlatMapMapper; local class incompatible: stream classdesc serialVersionUID = -497632544534637338, local class serialVersionUID = 2227027274844323451

这个就取决于具体的 ObjectStreamClass. computeDefaultSUID的计算方式了

如果在不指定 “private static final long serialVersionUID = 42L;” 的情况下, 计算一个 Class 的 serialVersionUID 方式为 ObjectStreamClass. computeDefaultSUID

ObjectStreamClass. computeDefaultSUID 的实现大致如下, 具体可以参考 对应的代码

它是和 类型名称, 类型修饰符, 实现的额接口名称 

07 开发环境的代码 和 jar代码不一致的情况产生的问题_jar_03

 

 

 

 

 

举报

相关推荐

0 条评论