0
点赞
收藏
分享

微信扫一扫

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十七)延时队列

朱悟能_9ad4 2022-01-31 阅读 39

第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息

Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。

ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。

相关源码如下所示:

org.apache.rocketmq.store.schedule.ScheduleMessageService#start

public void start() {
    if (started.compareAndSet(false, true)) {
        super.load();
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) {
                        ScheduleMessageService.this.persist();
                    }
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

需要注意的是,每个TimeTask在检查消息是否到期时,首先检查对应队列中尚未投递第一条消息,如果这条消息没到期,那么之后的消息都不会检查。如果到期了,则进行投递,并检查之后的消息是否到期。(因为存储延时消息的队列内部排序的规则是按照到期时间来排的,到期时间早的排前面)

第四步:将信息重新存储到CommitLog中

在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储。

举报

相关推荐

0 条评论