0
点赞
收藏
分享

微信扫一扫

分布式定时任务框架 Elastic-Job (2) Spring / Spring Boot 集成

分布式定时任务框架Elastic-Job 二 Spring / Spring Boot 整合

一、Spring 集成

1.1 创建Spring项目

IDEA为例:文件 -> 新建项目 -> Maven -> 原型中创建 -> org.apache.maven.archetypes:maven-archetype-webapp

1.2 添加依赖

添加spring 依赖

<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>5.3.14</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-web</artifactId>
  <version>5.3.14</version>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <version>1.18.22</version>
</dependency>
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-log4j12</artifactId>
  <version>1.7.32</version>
</dependency>

1.3 添加Elastic-Job依赖

<dependency>
  <groupId>com.dangdang</groupId>
  <artifactId>elastic-job-lite-core</artifactId>
  <version>2.1.5</version>
</dependency>
<dependency>
  <groupId>com.dangdang</groupId>
  <artifactId>elastic-job-lite-spring</artifactId>
  <version>2.1.5</version>
</dependency>

1.4 配置

配置web.xml

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
  <display-name>Spring elastic-job</display-name>
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath*:spring-config.xml</param-value>
  </context-param>
  
  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
</web-app>

配置spring-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.dangdang.com/schema/ddframe/reg
       http://www.dangdang.com/schema/ddframe/reg/reg.xsd
       http://www.dangdang.com/schema/ddframe/job
       http://www.dangdang.com/schema/ddframe/job/job.xsd
">
    <!--注册中心配置-->
    <reg:zookeeper server-lists="localhost:2181" namespace="spring-elasticjob" id="zkCenter"/>
    <!-- simple作业配置 -->
    <job:simple registry-center-ref="zkCenter" cron="0/10 * * * * ?" sharding-total-count="2" class="cn.flowboot.job.MySimpleJob"/>
</beans>


</beans>

package cn.flowboot.job;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalTime;


/**
 * <h1>My Simple Job</h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/19
 */
@Slf4j
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("SimpleJob %s 当前分片项 %d,总分片项 %d\n", LocalTime.now(),shardingContext.getShardingItem(),shardingContext.getShardingTotalCount());

    }
}

Spring Boot 集成

创建Spring Boot项目

IDEA为例:文件 -> 新建项目 -> Spring Initializr

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<!--配置-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!-- elastic-job-lite-->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
<dependency>
   <groupId>com.dangdang</groupId>
   <artifactId>elastic-job-lite-spring</artifactId>
   <version>2.1.5</version>
</dependency>
<!-- 用于测试 -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
    <scope>test</scope>
</dependency>
<!-- 用于日志和实体 -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.32</version>
</dependency>

属性类

ZookeeperProperties.java

package cn.flowboot.simple.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/19
 */
@Data
@ConfigurationProperties(prefix = "elastic-job.zookeeper")
public class ZookeeperProperties {

    /**
     * zookeeper服务器地址
     */
    private String server = "localhost:2181";
    /**
     * zookeeper命名空间
     */
    private String namespace = "elastic-job-zookeeper";
    /**
     * 等待重试的间隔时间的初始值 默认1000,单位:毫秒
     */
    private Integer baseSleepTimeMilliseconds = 1000;
    /**
     * 等待重试的间隔时间的最大值 默认3000,单位:毫秒
     */
    private Integer maxSleepTimeMilliseconds = 3000;
    /**
     * 最大重试次数 默认3
     */
    private Integer maxRetries = 3;
    /**
     * 会话超时时间 默认60000,单位:毫秒
     */
    private Integer sessionTimeoutMilliseconds = 60000;
    /**
     * 连接超时时间 默认15000,单位:毫秒
     */
    private Integer  connectionTimeoutMilliseconds = 15000;
}

自动配置

Zookeeper自动配置

ZookeeperAutoConfig.java

package cn.flowboot.aotuconfig;

import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/20
 */
@Configuration
@ConditionalOnProperty("elastic-job.zookeeper.server")
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {

    @Autowired
    private ZookeeperProperties zookeeperProperties;
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter zkCenter(){
        ZookeeperConfiguration zc = new ZookeeperConfiguration(zookeeperProperties.getServer(),zookeeperProperties.getNamespace());
        zc.setMaxSleepTimeMilliseconds(zookeeperProperties.getMaxSleepTimeMilliseconds());
        zc.setBaseSleepTimeMilliseconds(zookeeperProperties.getBaseSleepTimeMilliseconds());
        zc.setConnectionTimeoutMilliseconds(zookeeperProperties.getConnectionTimeoutMilliseconds());
        zc.setSessionTimeoutMilliseconds(zookeeperProperties.getSessionTimeoutMilliseconds());
        zc.setMaxRetries(zookeeperProperties.getMaxRetries());
        //初始化 交给Spring执行
        //crc.init();
        return new ZookeeperRegistryCenter(zc);
    }
}


修改为可以作为外部jar

根据Spring Boot 文档中 49.Creating Your Own Auto-configuration
49.Creating Your Own Auto-configuration
来自Edge网页翻译插件
49.自动配置
根据49.2对项目进行改造

将自动配置文件移动到启动类以外的包,涉及类如下

//原包路径cn.flowboot.simple.config下
cn.flowboot.aotuconfig.ZookeeperProperties
cn.flowboot.aotuconfig.ZookeeperAutoConfig
cn.flowboot.aotuconfig.ElasticJobProperties

在资源目录下创建META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.flowboot.aotuconfig.ZookeeperAutoConfig

Elastic-Job 自动配置

编写注解,注意这个编写在启动类之外包中,和zookeeper同一个位置即可

SimpleJob 配置

package cn.flowboot.aotuconfig;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/20
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {

    /**
     * 任务名称
     */
    String name() default  "";
    /**
     * cron表达式,用于控制作业触发时间,默认每间隔10秒钟执行一次
     */
    String cron() default  "";
    /**
     * 作业分片总数
     */
    int shardingTotalCount() default 1;

    /**
     * 是否可覆盖
     */
    boolean override() default false;

}

编写MySimpleJob

package cn.flowboot.simple.job;

import cn.flowboot.aotuconfig.ElasticSimpleJob;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;

import java.time.LocalTime;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/18
 */
@ElasticSimpleJob(name = "mySimpleJob",cron = "0/10 * * * * ?",shardingTotalCount = 2,override = true)
public class MySimpleJob implements SimpleJob {


    @Override
    public void execute(ShardingContext shardingContext) {
        System.out.printf("SimpleJob %s 当前分片项 %d,总分片项 %d\n", LocalTime.now(),shardingContext.getShardingItem(),shardingContext.getShardingTotalCount());
    }
}

SimpleJobAutoConfig.java

package cn.flowboot.aotuconfig;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Map;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/20
 */
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {

    /**
     * 获取Spring上下文
     */
    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private CoordinatorRegistryCenter zkCenter;

    /**
     * @PostConstruct 在对象加载完依赖注入后执行
     * 初始化 Simple Job
     */
    @PostConstruct
    private void initSimpleJob(){

        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);

        for (Map.Entry<String, Object> entry : beans.entrySet()) {
            Object instance = entry.getValue();
            Class<?>[] interfaces = instance.getClass().getInterfaces();
            for (Class<?> superInterface : interfaces) {
                //判断是否是实例
                if (superInterface == SimpleJob.class){
                    LiteJobConfiguration configuration = initElasticSimpleJob(instance);
                    new SpringJobScheduler((ElasticJob) instance,zkCenter,configuration).init();
                }
            }
        }

    }

    private LiteJobConfiguration initElasticSimpleJob(Object instance) {
        ElasticSimpleJob elasticSimpleJob = instance.getClass().getAnnotation(ElasticSimpleJob.class);
        String jobName = elasticSimpleJob.name();
        String cron = elasticSimpleJob.cron();
        int shardingTotalCount = elasticSimpleJob.shardingTotalCount();
        boolean override = elasticSimpleJob.override();
        //校验参数是否存在
        verificationAttribute(jobName, cron, shardingTotalCount);
        //job 核心配置
        JobCoreConfiguration jcc = JobCoreConfiguration
                .newBuilder(jobName,cron,shardingTotalCount)
                .build();
        //job 类配置
        JobTypeConfiguration jtc = new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());

        //job 根的配置 (LiteJobConfiguration)
        return LiteJobConfiguration
                .newBuilder(jtc)
                //覆盖配置
                .overwrite(override)
                .build();
    }

    private void verificationAttribute(String jobName, String cron, int shardingTotalCount) {
        if (StringUtils.isBlank(jobName)){
            throw new RuntimeException("ElasticSimpleJob:The attribute of name cannot be empty ");
        }
        if (StringUtils.isBlank(cron)){
            throw new RuntimeException("ElasticSimpleJob:The attribute of cron cannot be empty ");
        }
        if (shardingTotalCount <= 0){
            throw new RuntimeException("The attribute of shardingTotalCount cannot be less than or equal to 0 ");
        }
    }
}

配置添加
META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.flowboot.aotuconfig.ZookeeperAutoConfig,\
cn.flowboot.aotuconfig.SimpleJobAutoConfig

Dataflow配置

创建@ElasticDataflowJob 注解

package cn.flowboot.aotuconfig;

import org.springframework.stereotype.Component;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/20
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@ElasticSimpleJob
public @interface ElasticDataflowJob {

    /**
     * 任务名称
     */
    String name() default  "";
    /**
     * cron表达式,用于控制作业触发时间,默认每间隔10秒钟执行一次
     */
    String cron() default  "";
    /**
     * 作业分片总数
     */
    int shardingTotalCount() default 1;

    /**
     * 是否可覆盖
     */
    boolean override() default false;

    /**
     * 是否流式处理
     */
    boolean streamingProcess() default false;

}

使用

package cn.flowboot.simple.job;

import cn.flowboot.aotuconfig.ElasticDataflowJob;
import cn.flowboot.simple.model.Order;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/18
 */
@ElasticDataflowJob(name = "myDataflowJob",cron = "0/10 * * * * ?",shardingTotalCount = 2,override = true,streamingProcess = true)
public class MyDataflowJob implements DataflowJob<Order> {

    private List<Order> orders = new ArrayList<>();

    /**
     * 模拟订单数据
     */
    {
        for (int i = 0; i < 100; i++) {
            Order order = new Order();
            order.setOrderId(i+1);
            order.setStatus(0);
            orders.add(order);
        }
    }

    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
        //订单号 % 分片总数 == 当前分片项
        List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
                .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
                .collect(Collectors.toList());

        //只处理前10个
        if (orderList.size() >0){
            orderList = orderList.subList(0,10);
        }
        //模拟耗时
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("DataflowJob %s 当前分片项 %d,抓取数据是 %s\n", LocalTime.now(),shardingContext.getShardingItem(),orderList);

        return orderList;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<Order> list) {
        System.out.printf("DataflowJob %s 当前分片项 %d 正在处理数据...\n", LocalTime.now(),shardingContext.getShardingItem());
        list.forEach(o->o.setStatus(1));
        System.out.println(list+"\n"+orders);
        //模拟耗时
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

简单方法

与SimpleJobAutoConfig 基本一致,核心JobTypeConfiguration 实例为DataflowJobConfiguration,其中第三个参数streamingProcess在注解中定义,可克隆SimpleJobAutoConfig @ElasticSimpleJob 修改为@ElasticDataflowJob

//job 类配置
JobTypeConfiguration jtc = new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(),streamingProcess);
重构

减少代码冗余,两者相似简单抽出公共部分,其中verificationAttribute为公共工具,initInstance为启动入口,传递类型和注解类型,需要实现initElasticJob,不同类型不同实现方法,做个简单的父类,简化过于冗余的代码

package cn.flowboot.aotuconfig;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.lang.annotation.Annotation;
import java.util.Map;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/20
 */
@Getter
public abstract class AbstractJobAutoConfig {

    /**
     * 获取Spring上下文
     */
    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private CoordinatorRegistryCenter zkCenter;

    /**
     * @PostConstruct 在对象加载完依赖注入后执行
     * 初始化 Simple Job
     */
    public void initInstance(Class<?> jobClass,Class<? extends Annotation> jobAnnotationClass){

        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(jobAnnotationClass);

        for (Map.Entry<String, Object> entry : beans.entrySet()) {
            Object instance = entry.getValue();
            Class<?>[] interfaces = instance.getClass().getInterfaces();
            for (Class<?> superInterface : interfaces) {
                //判断是否是实例
                if (superInterface == jobClass){
                   	 initElasticJob(instance);
                }
            }
        }

    }

    public abstract void initElasticJob(Object instance) ;

    public void verificationAttribute(String jobName, String cron, int shardingTotalCount) {
        if (StringUtils.isBlank(jobName)){
            throw new RuntimeException("ElasticSimpleJob:The attribute of name cannot be empty ");
        }
        if (StringUtils.isBlank(cron)){
            throw new RuntimeException("ElasticSimpleJob:The attribute of cron cannot be empty ");
        }
        if (shardingTotalCount <= 0){
            throw new RuntimeException("The attribute of shardingTotalCount cannot be less than or equal to 0 ");
        }
    }
}

重构SimpleJobAutoConfig

package cn.flowboot.aotuconfig;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.lang.annotation.Annotation;
import java.util.Map;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/20
 */
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig extends AbstractJobAutoConfig{


    /**
     * @PostConstruct 在对象加载完依赖注入后执行
     * 初始化 Simple Job
     */
    @PostConstruct
    private void initSimpleJob(){
            initInstance(SimpleJob.class,ElasticSimpleJob.class);
    }


    @Override
    public void initElasticJob(Object instance) {
        ElasticSimpleJob elasticSimpleJob = instance.getClass().getAnnotation(ElasticSimpleJob.class);
        String jobName = elasticSimpleJob.name();
        String cron = elasticSimpleJob.cron();
        int shardingTotalCount = elasticSimpleJob.shardingTotalCount();
        boolean override = elasticSimpleJob.override();
        //校验参数是否存在
        verificationAttribute(jobName, cron, shardingTotalCount);
        //job 核心配置
        JobCoreConfiguration jcc = JobCoreConfiguration
                .newBuilder(jobName,cron,shardingTotalCount)
                .build();
        //job 类配置
        JobTypeConfiguration jtc = new SimpleJobConfiguration(jcc, instance.getClass().getCanonicalName());

        //job 根的配置 (LiteJobConfiguration)
        LiteJobConfiguration configuration = LiteJobConfiguration
                .newBuilder(jtc)
                //覆盖配置
                .overwrite(override)
                .build();
         new SpringJobScheduler((ElasticJob) instance,getZkCenter(),configuration).init();
    }


}

DataflowJobAutoConfig 编写

package cn.flowboot.aotuconfig;

import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Map;

/**
 * <h1></h1>
 *
 * @version 1.0
 * @author: Vincent Vic
 * @since: 2022/01/20
 */
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public  class DataflowJobAutoConfig  extends AbstractJobAutoConfig{


    /**
     * @PostConstruct 在对象加载完依赖注入后执行
     * 初始化 Simple Job
     */
    @PostConstruct
    private void initSimpleJob(){
        initInstance(DataflowJob.class,ElasticDataflowJob.class);
    }


    @Override
    public LiteJobConfiguration initElasticJob(Object instance) {
        ElasticDataflowJob elasticDataflowJob = instance.getClass().getAnnotation(ElasticDataflowJob.class);
        String jobName = elasticDataflowJob.name();
        String cron = elasticDataflowJob.cron();
        int shardingTotalCount = elasticDataflowJob.shardingTotalCount();
        boolean override = elasticDataflowJob.override();
        boolean streamingProcess = elasticDataflowJob.streamingProcess();
        //校验参数是否存在
        verificationAttribute(jobName, cron, shardingTotalCount);
        //job 核心配置
        JobCoreConfiguration jcc = JobCoreConfiguration
                .newBuilder(jobName,cron,shardingTotalCount)
                .build();
        //job 类配置
        JobTypeConfiguration jtc = new DataflowJobConfiguration(jcc, instance.getClass().getCanonicalName(),streamingProcess);

        //job 根的配置 (LiteJobConfiguration)
        LiteJobConfiguration configuration =  LiteJobConfiguration
                .newBuilder(jtc)
                //覆盖配置
                .overwrite(override)
                .build();
         new SpringJobScheduler((ElasticJob) instance,getZkCenter(),configuration).init();
    }


}

重构设计还欠缺很多

举报

相关推荐

0 条评论