0
点赞
收藏
分享

微信扫一扫

ElasticJob‐Lite:Dataflow作业

吴wuwu 2022-02-12 阅读 47

Dataflow作业用于处理数据流,需要实现DataflowJob接口。该接口提供2个方法,分别用于抓取 (fetchData)和处理 (processData) 数据。

package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import reactor.core.publisher.Flux;

import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:02
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class MyDataflowJob implements DataflowJob<Flux<String>> {
    private static final SimpleDateFormat formatter =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final String[][] message = {
            {"java", "c", "c++", "python", "go"},
            {"docker", "k8s"},
            {"elastic-job", "elasticsearch", "zookeeper", "spring cloud alibaba"}
    };

    @Override
    public List<Flux<String>> fetchData(ShardingContext shardingContext) {
        int item = shardingContext.getShardingItem();
        return Collections.singletonList(Flux.fromArray(message[item]));
    }

    @Override
    public void processData(ShardingContext shardingContext, List<Flux<String>> list) {
        System.out.println("-------------------------------------------------------------");
        System.out.println(formatter.format(new Date()));
        System.out.println(shardingContext.getShardingParameter());
        list.forEach(MyDataflowJob::printData);
    }

    private static void printData(Flux<String> data) {
        data.sort().toStream().forEach(System.out::println);
    }
}

Flux抽象类由reactor项目提供,Spring的响应式编程就是基于reactor项目。响应式编程是一种基于数据流和变化传递的声明式编程范式。这里就不详细介绍了,代码应该很容易看懂,就是根据分片项来获取对应的字符串数组,然后根据字典序从小到大的顺序打印该字符串数组中的字符串。getShardingParameter方法用于获取对应的分片参数。

reactor项目:

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.3.8.RELEASE</version>
        </dependency>
package com.kaven.job;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 17:05
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */
public class Application {
    public static void main(String[] args) {
        new ScheduleJobBootstrap(createRegistryCenter(), new MyDataflowJob(),
                createJobConfiguration()).schedule();
    }
    private static CoordinatorRegistryCenter createRegistryCenter() {
        ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.184:9000", "my-job");
        zc.setConnectionTimeoutMilliseconds(40000);
        zc.setMaxRetries(5);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);
        regCenter.init();
        return regCenter;
    }
    private static JobConfiguration createJobConfiguration() {
        return JobConfiguration.newBuilder("MyDataflowJob", 3)
                .shardingItemParameters("0=程序语言,1=容器技术,2=框架")
                .description("数据流作业")
                .cron("30 * * * * ?")
                .overwrite(true)
                .monitorExecution(false)
                .misfire(true)                
                .build();
    }
}

shardingItemParameters方法用于设置分片项和分片参数的映射,分片项和分片参数用等号分割,多个分片项和分片参数用逗号分割,分片项从零开始。monitorExecution方法是用于设置是否启动monitorExecution(默认启动,默认值为true),对于短间隔作业,最好禁用monitorExecution以提高性能(会增大ZooKeeper的压力,使得ZooKeeper性能下降,因为每次cron时间间隔会写数据到ZooKeeper上,用来保证数据不会重复获取)。 如果禁用monitorExecution,它不能保证数据重复获取,并且不能失效转移,因此作业需要保持幂等性。 对于长间隔作业,最好启用monitorExecution以保证只获取一次数据。misfire方法用于设置是否启动错过任务重执行(默认启动,默认值也为true),ElasticJob不允许作业在同一时间内叠加执行。当作业的执行时长超过其运行间隔(因为某种原因),错过任务重执行能够保证作业在完成上次的任务后继续执行逾期的作业。

结果如下图所示:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
Dataflow作业就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

举报

相关推荐

0 条评论